Integrating Allium’s Pub/Sub data streams into your systems is a straightforward process that allows you to access real-time data seamlessly. Follow the steps below to create and manage Pub/Sub streams for your organization.

You will need to set up a subscription to start receiving data. Allium supports two types of subscriptions:

  • Pull Subscription: This allows your system to pull data from the stream. When setting up a pull subscription, provide a service account or user that will be authorized to pull data.
  • Push Subscription: This option sends data directly to your specified webhook endpoint. When setting up a push subscription, ensure you provide the correct webhook URL where the data will be sent.

Step 1: Request access to data streams

To get started, email support@allium.so specifying the chains and schemas you’re interested in accessing.

For Pull Subscriptions: Include the email address you want to use for authentication. For Push Subscriptions: Provide the webhook URL where you want to receive the data.

Our team will confirm once your data streams are set up and ready to use.

Step 2: Receive the data

In order to receive data, you will need to configure your code to connect and pull data from the Pub/Sub subscription. Here’s an example Python snippet to pull the data:

from google.cloud import pubsub_v1



project_id = "degenalytics"

subscription_id = "<your subscription ID>"



def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:

    print(f"Received data: {message.data}")

    message.ack()



def pull_messages():

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(project_id, subscription_id)



    with subscriber:

        print(f"Listening for messages on {subscription_path}..\n")

        streaming_pull_future = subscriber.subscribe(

            subscription_path, callback=handle_message

        )

        while True:

            try:

                streaming_pull_future.result()

            except Exception:

                streaming_pull_future.cancel()

                streaming_pull_future.result()

                

if __name__ == "__main__":

    pull_messages()

The data should look something like:

KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPh...

You can read more about pull subscriptions and what each field means in the Google Pub/Sub docs.

Step 3: Decompress the data

Data in Allium’s streams are typically compressed for optimization reasons. We employ 3 different compression methods, depending on the shape, size and frequency of data coming through the stream:

The easiest way to determine which compression method was used, is to check the first few bytes of the data. Here are the bytes that the data will start with depending on the compression method used:

MethodFirst Bytes
gzip\x1f\x8b
zstandard\x28\xb5\x2f\xfd
lz4\x04\x22\x4d\x18

Here are sample scripts that will help you decompress the data if necessary:

import gzip



import lz4.frame

import zstandard

from google.cloud import pubsub_v1



GZIP_PREFIX_MAGIC_NUMBERS = b"\x1f\x8b"

ZSTD_PREFIX_MAGIC_NUMBERS = b"\x28\xb5\x2f\xfd"

LZ4_PREFIX_MAGIC_NUMBERS = b"\x04\x22\x4d\x18"





def auto_decompress(data):

    if data.startswith(GZIP_PREFIX_MAGIC_NUMBERS):

        try:

            return gzip.decompress(data)

        except OSError as e:

            print(f"Error decompressing gzip data: {e}")

    elif data.startswith(ZSTD_PREFIX_MAGIC_NUMBERS):

        try:

            decompressor = zstandard.ZstdDecompressor()

            stream_reader = decompressor.stream_reader(data)

            decompressed = stream_reader.read()

            stream_reader.close()

            return decompressed

        except Exception as e:

            print(f"Error decompressing zstd data: {e}")

    elif data.startswith(LZ4_PREFIX_MAGIC_NUMBERS):

        try:

            return lz4.frame.decompress(data)

        except lz4.frame.LZ4FrameError as e:

            print(f"Error decompressing lz4 data: {e}")

    return data





def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:

    data = auto_decompress(message.data)

    print(f"Received data: {str(data)}")

    message.ack()