Getting started with Google Pub/Sub

Integrating Allium's Pub/Sub data streams into your systems is a straightforward process that allows you to access real-time data seamlessly. Follow the steps below to create and manage Pub/Sub streams for your organization.

Step 1: Access the Integration Portal

To begin, visit the Allium Pub/Sub Integration page. Here, you can create and manage the Pub/Sub streams for the various types of data that Allium provides.

Step 2: Set Up Your Pub/Sub Subscription

Within the integration page, you can set up Pub/Sub streams tailored to the specific data your organization needs. Allium offers a variety of data types, including:

  • Raw Blockchain Data - e.g. blocks and transactions.

  • Decoded Data - e.g. decoded logs and traces.

  • Enriched Models - e.g. NFT data or DEX data.

You will need to set up a subscription to start receiving data. Allium supports two types of subscriptions:

  • Pull Subscription: This allows your system to pull data from the stream. When setting up a pull subscription, provide a service account or user that will be authorized to pull data.

  • Push Subscription: This option sends data directly to your specified webhook endpoint. When setting up a push subscription, ensure you provide the correct webhook URL where the data will be sent.

Step 3: Receive the data

In order to receive data, you will need to configure your code to connect and pull data from the Pub/Sub subscription. Here's an example Python snippet to pull the data:

from google.cloud import pubsub_v1

project_id = "degenalytics"
subscription_id = "<your subscription ID>"

def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received data: {message.data}")
    message.ack()

def pull_messages():
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    with subscriber:
        print(f"Listening for messages on {subscription_path}..\n")
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=handle_message
        )
        while True:
            try:
                streaming_pull_future.result()
            except Exception:
                streaming_pull_future.cancel()
                streaming_pull_future.result()
                
if __name__ == "__main__":
    pull_messages()

The data should look something like:

KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPhJQExASwB9eGn/1X1vIbf+CgYdgYpYIeFNvfARLQH8EMWvjFJfmbc8+oZ4lvu1mtVUx5uJ1kJCKiqmfKw8sezc9yuTy0gKICgqKoDxP7N86oDCDxXQFUH+O59Yezg507/zTFAHUCgVtV7Xh8LaW2ryrlXHwIBVTW5P/8eP9CsT7a4qhrejadGeR8VHWPbS4pAeem4SOS2CXiMICaJGVGZGdKksGUbhpRK2YdnYcBiXDYMMxDDgHMUSVjaQ7hhhr9BTbdFrPJTLDxcq9qBlVHz61MpBoTqWOGxURonWAZ0stpPPvcvI5PeARKRjuUXOsRMNySuWlWLuXGWoMniO9mqkuL+JhfBb63VV0f5VvVbPpBbIAiCIPLs/qs8koWoacL6VvV3jFkJuY7ZKoG4wDgYuE4ziYyT5SFO0CjSsBRk5CDxFpKZwBjUIUISzs/BsQ0ntZQyICwSA5vBw7p84oR23gTzqSleG7yS9eUB8gx7xxDAbgc+4Qu65ORjojAFZ5HI3thAFP7MRLKHZk8Fjso0K8SR2Dvko7YtZbTkXcNKsC2D4JpFI8l8cMA93NA+xABxcpLBgCBpkOhm00F5qHVGLomBkThsiw0bZBZgSuGe9vFw5GEf2CikpResDi2JKPDGNFLGrdEy16VEEYiBorFNMG+Q3W0kO6nYNKRL8yneDLThhhNrIDpk2QVhsDKl32Tr1Jo0XWQUBMLBfv51p8X/SYr9MjHby06S+lakzEgojaiRmEVw1/mILum7IpThe9gs8Y/SoviZS3gKOGbbH/SZ+tRtjtv/WlXO+bObAWPFdAo4nne+O153/tkdzo38LLnDqb6OMU3Ocaf5VAIrwOFhR0fQccTtv+4Z3zZNLla+jukJyuDCUFLKDpeZepryk0lx6R46PRua7mqZIYJY3/qWd9zKHznHfX0ICggJB4cg7TVEPlGj4bgliSxSgFB5EpV5cVregXBhE0uPCFFKOwxnMuQd/RYaG0BDH6eId0ETX5QyoiWBRe2SYAkNDDFmHsJ9UYiTllJSupgohkPEsXvJJlKDIRIJzwYQBVZapXUv0YcXWgPEuC5m+gM3jX05/onBjV8Lb/NaAh6ugdjIzGtkYN81MiQ5u7EL5bMWeNnS64F4SODGFSgYxSAXntlMboHwfu/OUSyaooQQDh+9MKLBFvGX9mmHZYUGw0c0xMabhwEeMHXYwOhkCQUFtqpjB03P0frki00Qs51kfbJk89+Vjlu4z71KoKr7725/LlbqI1W+EfwdrZigdCN8bY4iKJLwA+VIkm6GdLZUFOUHytSlpvxA+lIRdJ87QZOloge6kdaZeqAH0nnSEI4oCGOLrX3sA2UK32uffG2Eb5TvxP03O1kfggxBEI6oHOEpSzZLkKXwvZjSGtssXriNlBWdZoLINmS3PG3gGpJJ6Ri3lqAoidKguHzMcdDIRGG6FPUU1IeqqonZIsjxLtPrU293gGJ96qWmadLWRtrmK25NgBs3Zm+VkYf9V7gn60PAOVXHMeD2PjieDudA0BRp63jduRrCTsfF0xw7x12rgZzKd+856URjpo9h5Zsb4xRwrrX3WgDiGWqoURkjxBBDREZEAhEJSuQ1cAKRGZuZ8hIosUAQpTiSQQgYBiFACAhhBiFEeERBq+SGNcAYN8pyUFtEzylXW1+92sN0NViANUJurOjbfZ/Z9udvNHdKBomSxTgOCP4beKQxgPZwbePchqHDuKOS1kIXT61dy0KpFpmiHQVlOSJMVFUl6mCDO2aXnKPzWiJ6kTKBQ2UetjogWjhhJKEPup3/MHifKre0/hrV02U0hb67F+Zgh4R4Zn9HvjOx6Y2S6LhfkPsiBmcWOhIg2ItBNhQBlzOx9f6EIqu7wX+90kYHSwtnDkC5gVkGPcOVwUXqV8xgMu4twCh+jFmSSw3sO5UhfVFo38TcZDaj0CSeNyxtyDkgICQ166XMofHeK6OpcWIm6t/i+HhkOYGqua3MoFdbgC1DL28P

You can read more about pull subscriptions and what each fields mean in the Google Pub/Sub docs.

Step 4: Decompress the data

Data in Allium's streams are typically compressed for optimization reasons. We employ 3 different compression methods, depending on the shape, size and frequency of data coming through the stream:

The easiest way to determine which compression method was used, is to check the first few bytes of the data. Here are the bytes that the data will start with depending on the compression method used:

MethodFirst bytes

gzip

\x1f\x8b

zstandard

\x28\xb5\x2f\xfd

lz4

\x04\x22\x4d\x18

Here are sample scripts that will help you decompress the data if necessary:

import gzip

import lz4.frame
import zstandard
from google.cloud import pubsub_v1

GZIP_PREFIX_MAGIC_NUMBERS = b"\x1f\x8b"
ZSTD_PREFIX_MAGIC_NUMBERS = b"\x28\xb5\x2f\xfd"
LZ4_PREFIX_MAGIC_NUMBERS = b"\x04\x22\x4d\x18"


def auto_decompress(data):
    if data.startswith(GZIP_PREFIX_MAGIC_NUMBERS):
        try:
            return gzip.decompress(data)
        except OSError as e:
            print(f"Error decompressing gzip data: {e}")
    elif data.startswith(ZSTD_PREFIX_MAGIC_NUMBERS):
        try:
            decompressor = zstandard.ZstdDecompressor()
            stream_reader = decompressor.stream_reader(data)
            decompressed = stream_reader.read()
            stream_reader.close()
            return decompressed
        except Exception as e:
            print(f"Error decompressing zstd data: {e}")
    elif data.startswith(LZ4_PREFIX_MAGIC_NUMBERS):
        try:
            return lz4.frame.decompress(data)
        except lz4.frame.LZ4FrameError as e:
            print(f"Error decompressing lz4 data: {e}")
    return data


def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:
    data = auto_decompress(message.data)
    print(f"Received data: {str(data)}")
    message.ack()

Last updated