> ## Documentation Index
> Fetch the complete documentation index at: https://docs.allium.so/llms.txt
> Use this file to discover all available pages before exploring further.

# Google Pub/Sub

> Integrate Allium's realtime data streams via Google Cloud Pub/Sub.

Integrating Allium's Pub/Sub data streams into your systems allows you to access realtime blockchain data seamlessly. This guide walks you through setting up and managing Pub/Sub streams for your organization.

## Subscription Types

Allium supports two types of Pub/Sub subscriptions:

<CardGroup cols={2}>
  <Card title="Pull Subscription" icon="download">
    Your system pulls data from the stream. Requires a service account or user
    authorized to pull data.
  </Card>

  <Card title="Push Subscription" icon="paper-plane">
    Data is sent directly to your webhook endpoint. Requires a valid webhook
    URL.
  </Card>
</CardGroup>

## Setup Guide

<Steps>
  <Step title="Request Access to Data Streams">
    Email [support@allium.so](mailto:support@allium.so) with the following information:

    * Chains and schemas you want to access
    * **For Pull Subscriptions**: Email address for authentication
    * **For Push Subscriptions**: Webhook URL to receive data

    Our team will confirm once your data streams are ready to use.
  </Step>

  <Step title="Receive the Data">
    <Tabs>
      <Tab title="Pull Subscription">
        Configure your code to connect and pull data from the Pub/Sub subscription:

        ```python theme={null}
        from google.cloud import pubsub_v1

        project_id = "degenalytics"
        subscription_id = "<your subscription ID>"

        def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:
            print(f"Received data: {message.data}")
            message.ack()

        def pull_messages():
            subscriber = pubsub_v1.SubscriberClient()
            subscription_path = subscriber.subscription_path(project_id, subscription_id)

            with subscriber:
                print(f"Listening for messages on {subscription_path}..\n")
                streaming_pull_future = subscriber.subscribe(
                    subscription_path, callback=handle_message
                )
                while True:
                    try:
                        streaming_pull_future.result()
                    except Exception:
                        streaming_pull_future.cancel()
                        streaming_pull_future.result()

        if __name__ == "__main__":
            pull_messages()
        ```

        The data will be in compressed format:

        ```
        KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPh...
        ```

        Learn more about pull subscriptions in the [Google Pub/Sub docs](https://cloud.google.com/pubsub/docs/pull).
      </Tab>

      <Tab title="Push Subscription">
        Data will be automatically sent to your configured webhook endpoint in this format:

        ```json theme={null}
        {
          "message":{
            "data":"KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPh...",
            "messageId":"11996745375327517",
            "message_id":"11996745375327517",
            "publishTime":"2024-08-20T16:16:06.44Z",
            "publish_time":"2024-08-20T16:16:06.44Z"
          },
          "subscription":"projects/degenalytics/subscriptions/allium_app.portal.solana.nonvoting_transactions.push"
        }
        ```

        Learn more about push subscriptions in the [Google Pub/Sub docs](https://cloud.google.com/pubsub/docs/push).
      </Tab>
    </Tabs>
  </Step>

  <Step title="Decompress the Data">
    Allium's stream data is compressed for optimization. We use three different compression methods depending on the data characteristics:

    * [gzip](https://www.gzip.org/)
    * [zstandard](https://facebook.github.io/zstd/)
    * [lz4](https://github.com/lz4/lz4)

    ### Detect Compression Method

    Check the first few bytes of the data to determine the compression method:

    | Method    | First Bytes        |
    | :-------- | :----------------- |
    | gzip      | `\x1f\x8b`         |
    | zstandard | `\x28\xb5\x2f\xfd` |
    | lz4       | `\x04\x22\x4d\x18` |

    ### Decompression Scripts

    <CodeGroup>
      ```py Python theme={null}
      import gzip

      import lz4.frame
      import zstandard
      from google.cloud import pubsub_v1

      GZIP_PREFIX_MAGIC_NUMBERS = b"\x1f\x8b"
      ZSTD_PREFIX_MAGIC_NUMBERS = b"\x28\xb5\x2f\xfd"
      LZ4_PREFIX_MAGIC_NUMBERS = b"\x04\x22\x4d\x18"


      def auto_decompress(data):
          if data.startswith(GZIP_PREFIX_MAGIC_NUMBERS):
              try:
                  return gzip.decompress(data)
              except OSError as e:
                  print(f"Error decompressing gzip data: {e}")
          elif data.startswith(ZSTD_PREFIX_MAGIC_NUMBERS):
              try:
                  decompressor = zstandard.ZstdDecompressor()
                  stream_reader = decompressor.stream_reader(data)
                  decompressed = stream_reader.read()
                  stream_reader.close()
                  return decompressed
              except Exception as e:
                  print(f"Error decompressing zstd data: {e}")
          elif data.startswith(LZ4_PREFIX_MAGIC_NUMBERS):
              try:
                  return lz4.frame.decompress(data)
              except lz4.frame.LZ4FrameError as e:
                  print(f"Error decompressing lz4 data: {e}")
          return data


      def handle_message(message: pubsub_v1.subscriber.message.Message) -> None:
          data = auto_decompress(message.data)
          print(f"Received data: {str(data)}")
          message.ack()
      ```

      ```js JavaScript theme={null}
      import * as zlib from 'zlib';
      import * as lz4 from 'lz4';
      import * as zstandard from '@fbneill/node-zstd';
      import { PubSub } from '@google-cloud/pubsub';

      const GZIP_PREFIX_MAGIC_NUMBERS = Buffer.from([0x1f, 0x8b]);
      const ZSTD_PREFIX_MAGIC_NUMBERS = Buffer.from([0x28, 0xb5, 0x2f, 0xfd]);
      const LZ4_PREFIX_MAGIC_NUMBERS = Buffer.from([0x04, 0x22, 0x4d, 0x18]);

      function autoDecompress(data: Buffer): Buffer {
          if (data.slice(0, GZIP_PREFIX_MAGIC_NUMBERS.length).equals(GZIP_PREFIX_MAGIC_NUMBERS)) {
              try {
                  return zlib.gunzipSync(data);
              } catch (e) {
                  console.error(`Error decompressing gzip data: ${e}`);
              }
          } else if (data.slice(0, ZSTD_PREFIX_MAGIC_NUMBERS.length).equals(ZSTD_PREFIX_MAGIC_NUMBERS)) {
              try {
                  return zstandard.decompress(data);
              } catch (e) {
                  console.error(`Error decompressing zstd data: ${e}`);
              }
          } else if (data.slice(0, LZ4_PREFIX_MAGIC_NUMBERS.length).equals(LZ4_PREFIX_MAGIC_NUMBERS)) {
              try {
                  return lz4.decode(data);
              } catch (e) {
                  console.error(`Error decompressing lz4 data: ${e}`);
              }
          }
          return data;
      }

      function handleMessage(message: PubSub.Message): void {
          const data = autoDecompress(message.data);
          console.log(`Received data: ${data.toString()}`);
          message.ack();
      }
      ```
    </CodeGroup>
  </Step>
</Steps>
