Documentation Index Fetch the complete documentation index at: https://docs.allium.so/llms.txt
Use this file to discover all available pages before exploring further.
Websockets provide a persistent, bidirectional connection between your application and Allium, enabling real-time data delivery with lower latency than traditional HTTP requests.
Allium offers two websocket capabilities:
APIs over Websockets - Call any Allium API through a persistent websocket connection for faster response times
Data Streams over Websockets - Subscribe to real-time blockchain data streams (wraps Kafka topics)
If you need delivery guarantees and at-least-once semantics, we recommend using Kafka or Pub/Sub directly. Websockets are best for real-time monitoring where occasional message loss is acceptable.
APIs over Websockets
APIs over websockets allow you to make API requests through a persistent connection, reducing latency by eliminating the connection overhead of individual HTTP requests.
Connection
Connect to the websocket API endpoint with your API key:
import websocket
import json
API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/api"
# Connect with X-API-KEY header
ws = websocket.create_connection(
WS_URL ,
header = { "X-API-KEY" : API_KEY }
)
Send requests as JSON messages with the following structure:
{
"method" : "POST" ,
"id" : "unique-request-id" ,
"path" : "/api/v1/developer/trading/hyperliquid/info/fills" ,
"body" : {
"type" : "userFills" ,
"user" : "0xD5994523d6B235862bE1790C06cFBE8547869449"
}
}
Request Fields:
method (string, required): HTTP method - GET, POST, PUT, PATCH, or DELETE
id (string, required): Unique identifier for tracking the request/response
path (string, required): API endpoint path (e.g., /api/v1/developer/assets)
body (object, optional): Request parameters
For GET requests: URL query parameters sent as key-value pairs in body
For POST/PUT/PATCH/DELETE: Body parameters sent as key-value pairs in body
Responses are returned as JSON messages:
{
"id" : "unique-request-id" ,
"status" : "success" ,
"data" : { ... }
}
Response Fields:
id (string): Matches the request ID
status (string): Either "success" or "error"
data (object): Response data or error message
Example: POST Request with Body Parameters
import websocket
import json
from datetime import datetime
API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/api"
# Connect
ws = websocket.create_connection(
WS_URL ,
header = { "X-API-KEY" : API_KEY }
)
# Prepare POST request with body parameters
request = {
"method" : "POST" ,
"id" : f "request- { datetime.now().timestamp() } " ,
"path" : "/api/v1/developer/trading/hyperliquid/info/fills" ,
"body" : {
"type" : "userFills" ,
"user" : "0xD5994523d6B235862bE1790C06cFBE8547869449"
}
}
# Send request
ws.send(json.dumps(request))
# Receive response
response = ws.recv()
response_json = json.loads(response)
print (response_json)
ws.close()
Example: GET Request with Query Parameters
# GET request - parameters in body become URL query parameters
request = {
"method" : "GET" ,
"id" : "get-assets-1" ,
"path" : "/api/v1/developer/assets" ,
"body" : {
"blockchain" : "ethereum" ,
"limit" : "10"
}
}
ws.send(json.dumps(request))
response = ws.recv()
response_json = json.loads(response)
Concurrency & Rate Limits
Maximum 1,000 concurrent requests in flight per connection
However even with the above your account rate limits still apply, so you might not be able to reach 1,000 concurrent requests in flight per connection.
If you try to exceed this limit, you’ll receive an error response:
{
"id" : "your-request-id" ,
"status" : "error" ,
"data" : "max inflight requests reached, please wait for requests to finish"
}
Benefits
Lower latency : No connection overhead for each request
Request/response matching : Use unique IDs to track async responses
Data Streams over Websockets
Stream real-time blockchain data directly to your application. All Kafka topics are available via websockets for easier integration.
Connection
Connect to a data stream by specifying a topic:
from websocket import WebSocketApp
API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/stream?topic=ethereum.blocks"
ws = WebSocketApp(
WS_URL ,
header = { "X-API-KEY" : API_KEY },
on_open = on_open,
on_message = on_message,
on_error = on_error,
on_close = on_close
)
ws.run_forever()
Available Topics
All Kafka topics are available. Use the format: {blockchain}.{data_type}
Examples:
ethereum.blocks
ethereum.transactions
ethereum.logs
base.dex_trades
solana.transactions
hyperliquid.fills
bitcoin.blocks
See the Kafka documentation for the complete list of available topics and schemas.
Starting the Stream
After connecting, send a start message to begin receiving data:
def on_open ( ws ):
print ( "Connected to WebSocket" )
# Start receiving messages
start_message = { "action" : "start" }
ws.send(json.dumps(start_message))
Stopping the Stream
You can pause the stream without disconnecting by sending a stop message:
# Stop receiving messages (connection remains open)
stop_message = { "action" : "stop" }
ws.send(json.dumps(stop_message))
# Resume later by sending start again
start_message = { "action" : "start" }
ws.send(json.dumps(start_message))
This is useful when you need to temporarily pause data delivery, update filters, or reduce load without closing the websocket connection.
Filtering Data (Optional)
You can filter the stream to only receive messages matching specific criteria. Send a filter with the setFilter action before starting the stream:
def on_open ( ws ):
# Set a filter before starting
filter_message = {
"action" : "setFilter" ,
"data" : {
"field" : "to_address" ,
"operator" : "exists" ,
"value" : True
}
}
ws.send(json.dumps(filter_message))
# Start the stream
start_message = { "action" : "start" }
ws.send(json.dumps(start_message))
The data field contains the filter definition. Filters support comparison operators (=, !=, >, >=, <, <=), the in operator for arrays, the exists operator, and compound AND/OR logic.
Example with compound filter:
# Filter for large Ethereum transactions
filter_message = {
"action" : "setFilter" ,
"data" : {
"op" : "AND" ,
"conditions" : [
{
"field" : "blockchain" ,
"operator" : "=" ,
"value" : "ethereum"
},
{
"field" : "value" ,
"operator" : ">" ,
"value" : "1000000000000000000"
}
]
}
}
ws.send(json.dumps(filter_message))
See the Filter Syntax documentation for complete details on all operators, nested conditions, and real-world examples.
Receiving Messages
Messages are delivered as JSON:
def on_message ( ws , message ):
data = json.loads(message)
print ( f "Received: { data } " )
# Process your blockchain data
process_data(data)
Complete Example: Streaming Ethereum Blocks
from websocket import WebSocketApp
import json
API_KEY = "your_api_key_here"
WS_URL = "wss://api.allium.so/api/v1/developer/ws/stream?topic=ethereum.blocks"
def on_open ( ws ):
print ( "✅ Connected to stream" )
start_message = { "action" : "start" }
ws.send(json.dumps(start_message))
def on_message ( ws , message ):
data = json.loads(message)
block_number = data.get( 'number' )
print ( f "New block: { block_number } " )
def on_error ( ws , error ):
print ( f "❌ Error: { error } " )
def on_close ( ws , close_status_code , close_msg ):
print ( f "🔌 Disconnected: { close_msg } " )
ws = WebSocketApp(
WS_URL ,
header = { "X-API-KEY" : API_KEY },
on_open = on_open,
on_message = on_message,
on_error = on_error,
on_close = on_close
)
ws.run_forever()
Connection Management
Compression
Websocket compression is supported and automatically negotiated:
# Python websocket-client handles compression automatically
ws = websocket.create_connection(
WS_URL ,
header = {
"X-API-KEY" : API_KEY ,
"Sec-WebSocket-Extensions" : "permessage-deflate"
}
)
Reconnection
Implement reconnection logic for production applications:
import time
def connect_with_retry ( url , headers , max_retries = 5 ):
retries = 0
while retries < max_retries:
try :
ws = websocket.create_connection(url, header = headers)
print ( "Connected successfully" )
return ws
except Exception as e:
retries += 1
wait_time = min ( 2 ** retries, 30 )
print ( f "Connection failed. Retrying in { wait_time } s..." )
time.sleep(wait_time)
raise Exception ( "Failed to connect after max retries" )
Error Handling
Handle common websocket errors:
try :
ws.send(json.dumps(request))
response = ws.recv()
except websocket.WebSocketTimeoutException:
print ( "Request timeout" )
except websocket.WebSocketConnectionClosedException:
print ( "Connection closed unexpectedly" )
# Implement reconnection
except Exception as e:
print ( f "Error: { e } " )
Authentication
Both websocket types require an API key passed in the connection headers:
header = { "X-API-KEY" : "your_api_key_here" }
APIs over Websockets : Requires it to be enabled on your account
Streams over Websockets : Requires it to be enabled and topic access
Contact account representative to enable websocket features on your account.
Next Steps
Filter Syntax Learn how to filter streams with complex conditions
Kafka Streams Production-grade streaming with delivery guarantees
API Reference Explore available API endpoints