Skip to main content
Integrating Allium’s Pub/Sub data streams into your systems allows you to access real-time blockchain data seamlessly. This guide walks you through setting up and managing Pub/Sub streams for your organization.

Subscription Types

Allium supports two types of Pub/Sub subscriptions:

Pull Subscription

Your system pulls data from the stream. Requires a service account or user authorized to pull data.

Push Subscription

Data is sent directly to your webhook endpoint. Requires a valid webhook URL.

Setup Guide

1

Request Access to Data Streams

Email support@allium.so with the following information:
  • Chains and schemas you want to access
  • For Pull Subscriptions: Email address for authentication
  • For Push Subscriptions: Webhook URL to receive data
Our team will confirm once your data streams are ready to use.
2

Receive the Data

  • Pull Subscription
  • Push Subscription
Configure your code to connect and pull data from the Pub/Sub subscription:
from google.cloud import pubsub_v1

project_id = "degenalytics"
subscription_id = "<your subscription ID>"

def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received data: {message.data}")
    message.ack()

def pull_messages():
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    with subscriber:
        print(f"Listening for messages on {subscription_path}..\n")
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=handle_message
        )
        while True:
            try:
                streaming_pull_future.result()
            except Exception:
                streaming_pull_future.cancel()
                streaming_pull_future.result()
                
if __name__ == "__main__":
    pull_messages()
The data will be in compressed format:
KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPh...
Learn more about pull subscriptions in the Google Pub/Sub docs.
3

Decompress the Data

Allium’s stream data is compressed for optimization. We use three different compression methods depending on the data characteristics:

Detect Compression Method

Check the first few bytes of the data to determine the compression method:
MethodFirst Bytes
gzip\x1f\x8b
zstandard\x28\xb5\x2f\xfd
lz4\x04\x22\x4d\x18

Decompression Scripts

import gzip

import lz4.frame
import zstandard
from google.cloud import pubsub_v1

GZIP_PREFIX_MAGIC_NUMBERS = b"\x1f\x8b"
ZSTD_PREFIX_MAGIC_NUMBERS = b"\x28\xb5\x2f\xfd"
LZ4_PREFIX_MAGIC_NUMBERS = b"\x04\x22\x4d\x18"


def auto_decompress(data):
    if data.startswith(GZIP_PREFIX_MAGIC_NUMBERS):
        try:
            return gzip.decompress(data)
        except OSError as e:
            print(f"Error decompressing gzip data: {e}")
    elif data.startswith(ZSTD_PREFIX_MAGIC_NUMBERS):
        try:
            decompressor = zstandard.ZstdDecompressor()
            stream_reader = decompressor.stream_reader(data)
            decompressed = stream_reader.read()
            stream_reader.close()
            return decompressed
        except Exception as e:
            print(f"Error decompressing zstd data: {e}")
    elif data.startswith(LZ4_PREFIX_MAGIC_NUMBERS):
        try:
            return lz4.frame.decompress(data)
        except lz4.frame.LZ4FrameError as e:
            print(f"Error decompressing lz4 data: {e}")
    return data


def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:
    data = auto_decompress(message.data)
    print(f"Received data: {str(data)}")
    message.ack()
I