Skip to main content
Websockets provide a persistent, bidirectional connection between your application and Allium, enabling real-time data delivery with lower latency than traditional HTTP requests. Allium offers two websocket capabilities:
  1. APIs over Websockets - Call any Allium API through a persistent websocket connection for faster response times
  2. Data Streams over Websockets - Subscribe to real-time blockchain data streams (wraps Kafka topics)
If you need delivery guarantees and at-least-once semantics, we recommend using Kafka or Pub/Sub directly. Websockets are best for real-time monitoring where occasional message loss is acceptable.

APIs over Websockets

APIs over websockets allow you to make API requests through a persistent connection, reducing latency by eliminating the connection overhead of individual HTTP requests.

Connection

Connect to the websocket API endpoint with your API key:
import websocket
import json

API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/api"

# Connect with X-API-KEY header
ws = websocket.create_connection(
    WS_URL,
    header={"X-API-KEY": API_KEY}
)

Request Format

Send requests as JSON messages with the following structure:
{
  "method": "POST",
  "id": "unique-request-id",
  "path": "/api/v1/developer/trading/hyperliquid/info/fills",
  "body": {
    "type": "userFills",
    "user": "0xD5994523d6B235862bE1790C06cFBE8547869449"
  }
}
Request Fields:
  • method (string, required): HTTP method - GET, POST, PUT, PATCH, or DELETE
  • id (string, required): Unique identifier for tracking the request/response
  • path (string, required): API endpoint path (e.g., /api/v1/developer/assets)
  • body (object, optional): Request parameters
    • For GET requests: URL query parameters sent as key-value pairs in body
    • For POST/PUT/PATCH/DELETE: Body parameters sent as key-value pairs in body

Response Format

Responses are returned as JSON messages:
{
  "id": "unique-request-id",
  "status": "success",
  "data": { ... }
}
Response Fields:
  • id (string): Matches the request ID
  • status (string): Either "success" or "error"
  • data (object): Response data or error message

Example: POST Request with Body Parameters

import websocket
import json
from datetime import datetime

API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/api"

# Connect
ws = websocket.create_connection(
    WS_URL,
    header={"X-API-KEY": API_KEY}
)

# Prepare POST request with body parameters
request = {
    "method": "POST",
    "id": f"request-{datetime.now().timestamp()}",
    "path": "/api/v1/developer/trading/hyperliquid/info/fills",
    "body": {
        "type": "userFills",
        "user": "0xD5994523d6B235862bE1790C06cFBE8547869449"
    }
}

# Send request
ws.send(json.dumps(request))

# Receive response
response = ws.recv()
response_json = json.loads(response)

print(response_json)

ws.close()

Example: GET Request with Query Parameters

# GET request - parameters in body become URL query parameters
request = {
    "method": "GET",
    "id": "get-assets-1",
    "path": "/api/v1/developer/assets",
    "body": {
        "blockchain": "ethereum",
        "limit": "10"
    }
}

ws.send(json.dumps(request))
response = ws.recv()
response_json = json.loads(response)

Concurrency & Rate Limits

  • Maximum 1,000 concurrent requests in flight per connection
  • However even with the above your account rate limits still apply, so you might not be able to reach 1,000 concurrent requests in flight per connection.
  • If you try to exceed this limit, you’ll receive an error response:
{
  "id": "your-request-id",
  "status": "error",
  "data": "max inflight requests reached, please wait for requests to finish"
}

Benefits

  • Lower latency: No connection overhead for each request
  • Request/response matching: Use unique IDs to track async responses

Data Streams over Websockets

Stream real-time blockchain data directly to your application. All Kafka topics are available via websockets for easier integration.

Connection

Connect to a data stream by specifying a topic:
from websocket import WebSocketApp

API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/stream?topic=ethereum.blocks"

ws = WebSocketApp(
    WS_URL,
    header={"X-API-KEY": API_KEY},
    on_open=on_open,
    on_message=on_message,
    on_error=on_error,
    on_close=on_close
)

ws.run_forever()

Available Topics

All Kafka topics are available. Use the format: {blockchain}.{data_type} Examples:
  • ethereum.blocks
  • ethereum.transactions
  • ethereum.logs
  • base.dex_trades
  • solana.transactions
  • hyperliquid.fills
  • bitcoin.blocks
See the Kafka documentation for the complete list of available topics and schemas.

Starting the Stream

After connecting, send a start message to begin receiving data:
def on_open(ws):
    print("Connected to WebSocket")
    
    # Start receiving messages
    start_message = {"action": "start"}
    ws.send(json.dumps(start_message))

Stopping the Stream

You can pause the stream without disconnecting by sending a stop message:
# Stop receiving messages (connection remains open)
stop_message = {"action": "stop"}
ws.send(json.dumps(stop_message))

# Resume later by sending start again
start_message = {"action": "start"}
ws.send(json.dumps(start_message))
This is useful when you need to temporarily pause data delivery, update filters, or reduce load without closing the websocket connection.

Filtering Data (Optional)

You can filter the stream to only receive messages matching specific criteria. Send a filter with the setFilter action before starting the stream:
def on_open(ws):
    # Set a filter before starting
    filter_message = {
        "action": "setFilter",
        "data": {
            "field": "to_address",
            "operator": "exists",
            "value": True
        }
    }
    ws.send(json.dumps(filter_message))
    
    # Start the stream
    start_message = {"action": "start"}
    ws.send(json.dumps(start_message))
The data field contains the filter definition. Filters support comparison operators (=, !=, >, >=, <, <=), the in operator for arrays, the exists operator, and compound AND/OR logic. Example with compound filter:
# Filter for large Ethereum transactions
filter_message = {
    "action": "setFilter",
    "data": {
        "op": "AND",
        "conditions": [
            {
                "field": "blockchain",
                "operator": "=",
                "value": "ethereum"
            },
            {
                "field": "value",
                "operator": ">",
                "value": "1000000000000000000"
            }
        ]
    }
}
ws.send(json.dumps(filter_message))
See the Filter Syntax documentation for complete details on all operators, nested conditions, and real-world examples.

Receiving Messages

Messages are delivered as JSON:
def on_message(ws, message):
    data = json.loads(message)
    print(f"Received: {data}")
    
    # Process your blockchain data
    process_data(data)

Complete Example: Streaming Ethereum Blocks

from websocket import WebSocketApp
import json

API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/stream?topic=ethereum.blocks"

def on_open(ws):
    print("✅ Connected to stream")
    start_message = {"action": "start"}
    ws.send(json.dumps(start_message))

def on_message(ws, message):
    data = json.loads(message)
    block_number = data.get('number')
    print(f"New block: {block_number}")

def on_error(ws, error):
    print(f"❌ Error: {error}")

def on_close(ws, close_status_code, close_msg):
    print(f"🔌 Disconnected: {close_msg}")

ws = WebSocketApp(
    WS_URL,
    header={"X-API-KEY": API_KEY},
    on_open=on_open,
    on_message=on_message,
    on_error=on_error,
    on_close=on_close
)

ws.run_forever()

Connection Management

Compression

Websocket compression is supported and automatically negotiated:
# Python websocket-client handles compression automatically
ws = websocket.create_connection(
    WS_URL,
    header={
        "X-API-KEY": API_KEY,
        "Sec-WebSocket-Extensions": "permessage-deflate"
    }
)

Reconnection

Implement reconnection logic for production applications:
import time

def connect_with_retry(url, headers, max_retries=5):
    retries = 0
    while retries < max_retries:
        try:
            ws = websocket.create_connection(url, header=headers)
            print("Connected successfully")
            return ws
        except Exception as e:
            retries += 1
            wait_time = min(2 ** retries, 30)
            print(f"Connection failed. Retrying in {wait_time}s...")
            time.sleep(wait_time)
    
    raise Exception("Failed to connect after max retries")

Error Handling

Handle common websocket errors:
try:
    ws.send(json.dumps(request))
    response = ws.recv()
except websocket.WebSocketTimeoutException:
    print("Request timeout")
except websocket.WebSocketConnectionClosedException:
    print("Connection closed unexpectedly")
    # Implement reconnection
except Exception as e:
    print(f"Error: {e}")

Authentication

Both websocket types require an API key passed in the connection headers:
header = {"X-API-KEY": "your_api_key_here"}
  • APIs over Websockets: Requires it to be enabled on your account
  • Streams over Websockets: Requires it to be enabled and topic access
Contact account representative to enable websocket features on your account.

Next Steps