Documentation Index Fetch the complete documentation index at: https://docs.allium.so/llms.txt
Use this file to discover all available pages before exploring further.
Integrating Allium’s Pub/Sub data streams into your systems allows you to access realtime blockchain data seamlessly. This guide walks you through setting up and managing Pub/Sub streams for your organization.
Subscription Types
Allium supports two types of Pub/Sub subscriptions:
Pull Subscription Your system pulls data from the stream. Requires a service account or user
authorized to pull data.
Push Subscription Data is sent directly to your webhook endpoint. Requires a valid webhook
URL.
Setup Guide
Request Access to Data Streams
Email support@allium.so with the following information:
Chains and schemas you want to access
For Pull Subscriptions : Email address for authentication
For Push Subscriptions : Webhook URL to receive data
Our team will confirm once your data streams are ready to use.
Receive the Data
Pull Subscription
Push Subscription
Configure your code to connect and pull data from the Pub/Sub subscription: from google.cloud import pubsub_v1
project_id = "degenalytics"
subscription_id = "<your subscription ID>"
def handle_message ( message : pubsub_v1.subscriber.message.Message) -> None :
print ( f "Received data: { message.data } " )
message.ack()
def pull_messages ():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
with subscriber:
print ( f "Listening for messages on { subscription_path } .. \n " )
streaming_pull_future = subscriber.subscribe(
subscription_path, callback = handle_message
)
while True :
try :
streaming_pull_future.result()
except Exception :
streaming_pull_future.cancel()
streaming_pull_future.result()
if __name__ == "__main__" :
pull_messages()
The data will be in compressed format: KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPh...
Learn more about pull subscriptions in the Google Pub/Sub docs . Data will be automatically sent to your configured webhook endpoint in this format: {
"message" :{
"data" : "KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPh..." ,
"messageId" : "11996745375327517" ,
"message_id" : "11996745375327517" ,
"publishTime" : "2024-08-20T16:16:06.44Z" ,
"publish_time" : "2024-08-20T16:16:06.44Z"
},
"subscription" : "projects/degenalytics/subscriptions/allium_app.portal.solana.nonvoting_transactions.push"
}
Learn more about push subscriptions in the Google Pub/Sub docs .
Decompress the Data
Allium’s stream data is compressed for optimization. We use three different compression methods depending on the data characteristics: Detect Compression Method Check the first few bytes of the data to determine the compression method: Method First Bytes gzip \x1f\x8bzstandard \x28\xb5\x2f\xfdlz4 \x04\x22\x4d\x18
Decompression Scripts import gzip
import lz4.frame
import zstandard
from google.cloud import pubsub_v1
GZIP_PREFIX_MAGIC_NUMBERS = b " \x1f\x8b "
ZSTD_PREFIX_MAGIC_NUMBERS = b " \x28\xb5\x2f\xfd "
LZ4_PREFIX_MAGIC_NUMBERS = b " \x04\x22\x4d\x18 "
def auto_decompress ( data ):
if data.startswith( GZIP_PREFIX_MAGIC_NUMBERS ):
try :
return gzip.decompress(data)
except OSError as e:
print ( f "Error decompressing gzip data: { e } " )
elif data.startswith( ZSTD_PREFIX_MAGIC_NUMBERS ):
try :
decompressor = zstandard.ZstdDecompressor()
stream_reader = decompressor.stream_reader(data)
decompressed = stream_reader.read()
stream_reader.close()
return decompressed
except Exception as e:
print ( f "Error decompressing zstd data: { e } " )
elif data.startswith( LZ4_PREFIX_MAGIC_NUMBERS ):
try :
return lz4.frame.decompress(data)
except lz4.frame.LZ4FrameError as e:
print ( f "Error decompressing lz4 data: { e } " )
return data
def handle_message ( message : pubsub_v1.subscriber.message.Message) -> None :
data = auto_decompress(message.data)
print ( f "Received data: { str (data) } " )
message.ack()