INTEGRATIONS Kafka / PubSubGetting started with Google Pub/Sub Integrating Allium's Pub/Sub data streams into your systems is a straightforward process that allows you to access real-time data seamlessly. Follow the steps below to create and manage Pub/Sub streams for your organization.
Step 1: Access the Integration Portal
To begin, visit the Allium Pub/Sub Integration page . Here, you can create and manage the Pub/Sub streams for the various types of data that Allium provides.
Step 2: Set Up Your Pub/Sub Subscription
Within the integration page, you can set up Pub/Sub streams tailored to the specific data your organization needs. Allium offers a variety of data types, including:
Raw Blockchain Data - e.g. blocks and transactions.
Decoded Data - e.g. decoded logs and traces.
Enriched Models - e.g. NFT data or DEX data.
You will need to set up a subscription to start receiving data. Allium supports two types of subscriptions:
Pull Subscription : This allows your system to pull data from the stream. When setting up a pull subscription, provide a service account or user that will be authorized to pull data.
Push Subscription : This option sends data directly to your specified webhook endpoint. When setting up a push subscription, ensure you provide the correct webhook URL where the data will be sent.
Step 3: Receive the data
Pull Subscription Push Subscription
In order to receive data, you will need to configure your code to connect and pull data from the Pub/Sub subscription. Here's an example Python snippet to pull the data:
Copy from google . cloud import pubsub_v1
project_id = "degenalytics"
subscription_id = "<your subscription ID>"
def handle_message ( message : pubsub_v1 . subscriber . message . Message) -> None :
print ( f "Received data: { message.data } " )
message . ack ()
def pull_messages ():
subscriber = pubsub_v1 . SubscriberClient ()
subscription_path = subscriber . subscription_path (project_id, subscription_id)
with subscriber :
print ( f "Listening for messages on { subscription_path } .. \n " )
streaming_pull_future = subscriber . subscribe (
subscription_path, callback = handle_message
)
while True :
try :
streaming_pull_future . result ()
except Exception :
streaming_pull_future . cancel ()
streaming_pull_future . result ()
if __name__ == "__main__" :
pull_messages ()
The data should look something like:
Copy KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPhJQExASwB9eGn/1X1vIbf+CgYdgYpYIeFNvfARLQH8EMWvjFJfmbc8+oZ4lvu1mtVUx5uJ1kJCKiqmfKw8sezc9yuTy0gKICgqKoDxP7N86oDCDxXQFUH+O59Yezg507/zTFAHUCgVtV7Xh8LaW2ryrlXHwIBVTW5P/8eP9CsT7a4qhrejadGeR8VHWPbS4pAeem4SOS2CXiMICaJGVGZGdKksGUbhpRK2YdnYcBiXDYMMxDDgHMUSVjaQ7hhhr9BTbdFrPJTLDxcq9qBlVHz61MpBoTqWOGxURonWAZ0stpPPvcvI5PeARKRjuUXOsRMNySuWlWLuXGWoMniO9mqkuL+JhfBb63VV0f5VvVbPpBbIAiCIPLs/qs8koWoacL6VvV3jFkJuY7ZKoG4wDgYuE4ziYyT5SFO0CjSsBRk5CDxFpKZwBjUIUISzs/BsQ0ntZQyICwSA5vBw7p84oR23gTzqSleG7yS9eUB8gx7xxDAbgc+4Qu65ORjojAFZ5HI3thAFP7MRLKHZk8Fjso0K8SR2Dvko7YtZbTkXcNKsC2D4JpFI8l8cMA93NA+xABxcpLBgCBpkOhm00F5qHVGLomBkThsiw0bZBZgSuGe9vFw5GEf2CikpResDi2JKPDGNFLGrdEy16VEEYiBorFNMG+Q3W0kO6nYNKRL8yneDLThhhNrIDpk2QVhsDKl32Tr1Jo0XWQUBMLBfv51p8X/SYr9MjHby06S+lakzEgojaiRmEVw1/mILum7IpThe9gs8Y/SoviZS3gKOGbbH/SZ+tRtjtv/WlXO+bObAWPFdAo4nne+O153/tkdzo38LLnDqb6OMU3Ocaf5VAIrwOFhR0fQccTtv+4Z3zZNLla+jukJyuDCUFLKDpeZepryk0lx6R46PRua7mqZIYJY3/qWd9zKHznHfX0ICggJB4cg7TVEPlGj4bgliSxSgFB5EpV5cVregXBhE0uPCFFKOwxnMuQd/RYaG0BDH6eId0ETX5QyoiWBRe2SYAkNDDFmHsJ9UYiTllJSupgohkPEsXvJJlKDIRIJzwYQBVZapXUv0YcXWgPEuC5m+gM3jX05/onBjV8Lb/NaAh6ugdjIzGtkYN81MiQ5u7EL5bMWeNnS64F4SODGFSgYxSAXntlMboHwfu/OUSyaooQQDh+9MKLBFvGX9mmHZYUGw0c0xMabhwEeMHXYwOhkCQUFtqpjB03P0frki00Qs51kfbJk89+Vjlu4z71KoKr7725/LlbqI1W+EfwdrZigdCN8bY4iKJLwA+VIkm6GdLZUFOUHytSlpvxA+lIRdJ87QZOloge6kdaZeqAH0nnSEI4oCGOLrX3sA2UK32uffG2Eb5TvxP03O1kfggxBEI6oHOEpSzZLkKXwvZjSGtssXriNlBWdZoLINmS3PG3gGpJJ6Ri3lqAoidKguHzMcdDIRGG6FPUU1IeqqonZIsjxLtPrU293gGJ96qWmadLWRtrmK25NgBs3Zm+VkYf9V7gn60PAOVXHMeD2PjieDudA0BRp63jduRrCTsfF0xw7x12rgZzKd+856URjpo9h5Zsb4xRwrrX3WgDiGWqoURkjxBBDREZEAhEJSuQ1cAKRGZuZ8hIosUAQpTiSQQgYBiFACAhhBiFEeERBq+SGNcAYN8pyUFtEzylXW1+92sN0NViANUJurOjbfZ/Z9udvNHdKBomSxTgOCP4beKQxgPZwbePchqHDuKOS1kIXT61dy0KpFpmiHQVlOSJMVFUl6mCDO2aXnKPzWiJ6kTKBQ2UetjogWjhhJKEPup3/MHifKre0/hrV02U0hb67F+Zgh4R4Zn9HvjOx6Y2S6LhfkPsiBmcWOhIg2ItBNhQBlzOx9f6EIqu7wX+90kYHSwtnDkC5gVkGPcOVwUXqV8xgMu4twCh+jFmSSw3sO5UhfVFo38TcZDaj0CSeNyxtyDkgICQ166XMofHeK6OpcWIm6t/i+HhkOYGqua3MoFdbgC1DL28P
You can read more about pull subscriptions and what each fields mean in the Google Pub/Sub docs .
You should already be seeing data being sent to the endpoint you configured. The data you receive should look something like:
Copy {
"message" : {
"data":"KLUv/QBYHTAAamIgEyXgRo4PCo4XahMHfypstXTqD/Hg5uJgA2bgS1rrTGCzzDDDDPPhJQExASwB9eGn/1X1vIbf+CgYdgYpYIeFNvfARLQH8EMWvjFJfmbc8+oZ4lvu1mtVUx5uJ1kJCKiqmfKw8sezc9yuTy0gKICgqKoDxP7N86oDCDxXQFUH+O59Yezg507/zTFAHUCgVtV7Xh8LaW2ryrlXHwIBVTW5P/8eP9CsT7a4qhrejadGeR8VHWPbS4pAeem4SOS2CXiMICaJGVGZGdKksGUbhpRK2YdnYcBiXDYMMxDDgHMUSVjaQ7hhhr9BTbdFrPJTLDxcq9qBlVHz61MpBoTqWOGxURonWAZ0stpPPvcvI5PeARKRjuUXOsRMNySuWlWLuXGWoMniO9mqkuL+JhfBb63VV0f5VvVbPpBbIAiCIPLs/qs8koWoacL6VvV3jFkJuY7ZKoG4wDgYuE4ziYyT5SFO0CjSsBRk5CDxFpKZwBjUIUISzs/BsQ0ntZQyICwSA5vBw7p84oR23gTzqSleG7yS9eUB8gx7xxDAbgc+4Qu65ORjojAFZ5HI3thAFP7MRLKHZk8Fjso0K8SR2Dvko7YtZbTkXcNKsC2D4JpFI8l8cMA93NA+xABxcpLBgCBpkOhm00F5qHVGLomBkThsiw0bZBZgSuGe9vFw5GEf2CikpResDi2JKPDGNFLGrdEy16VEEYiBorFNMG+Q3W0kO6nYNKRL8yneDLThhhNrIDpk2QVhsDKl32Tr1Jo0XWQUBMLBfv51p8X/SYr9MjHby06S+lakzEgojaiRmEVw1/mILum7IpThe9gs8Y/SoviZS3gKOGbbH/SZ+tRtjtv/WlXO+bObAWPFdAo4nne+O153/tkdzo38LLnDqb6OMU3Ocaf5VAIrwOFhR0fQccTtv+4Z3zZNLla+jukJyuDCUFLKDpeZepryk0lx6R46PRua7mqZIYJY3/qWd9zKHznHfX0ICggJB4cg7TVEPlGj4bgliSxSgFB5EpV5cVregXBhE0uPCFFKOwxnMuQd/RYaG0BDH6eId0ETX5QyoiWBRe2SYAkNDDFmHsJ9UYiTllJSupgohkPEsXvJJlKDIRIJzwYQBVZapXUv0YcXWgPEuC5m+gM3jX05/onBjV8Lb/NaAh6ugdjIzGtkYN81MiQ5u7EL5bMWeNnS64F4SODGFSgYxSAXntlMboHwfu/OUSyaooQQDh+9MKLBFvGX9mmHZYUGw0c0xMabhwEeMHXYwOhkCQUFtqpjB03P0frki00Qs51kfbJk89+Vjlu4z71KoKr7725/LlbqI1W+EfwdrZigdCN8bY4iKJLwA+VIkm6GdLZUFOUHytSlpvxA+lIRdJ87QZOloge6kdaZeqAH0nnSEI4oCGOLrX3sA2UK32uffG2Eb5TvxP03O1kfggxBEI6oHOEpSzZLkKXwvZjSGtssXriNlBWdZoLINmS3PG3gGpJJ6Ri3lqAoidKguHzMcdDIRGG6FPUU1IeqqonZIsjxLtPrU293gGJ96qWmadLWRtrmK25NgBs3Zm+VkYf9V7gn60PAOVXHMeD2PjieDudA0BRp63jduRrCTsfF0xw7x12rgZzKd+856URjpo9h5Zsb4xRwrrX3WgDiGWqoURkjxBBDREZEAhEJSuQ1cAKRGZuZ8hIosUAQpTiSQQgYBiFACAhhBiFEeERBq+SGNcAYN8pyUFtEzylXW1+92sN0NViANUJurOjbfZ/Z9udvNHdKBomSxTgOCP4beKQxgPZwbePchqHDuKOS1kIXT61dy0KpFpmiHQVlOSJMVFUl6mCDO2aXnKPzWiJ6kTKBQ2UetjogWjhhJKEPup3/MHifKre0/hrV02U0hb67F+Zgh4R4Zn9HvjOx6Y2S6LhfkPsiBmcWOhIg2ItBNhQBlzOx9f6EIqu7wX+90kYHSwtnDkC5gVkGPcOVwUXqV8xgMu4twCh+jFmSSw3sO5UhfVFo38TcZDaj0CSeNyxtyDkgICQ166XMofHeK6OpcWIm6t/i+HhkOYGqua3MoFdbgC1DL28P",
"messageId" : "11996745375327517" ,
"message_id" : "11996745375327517" ,
"publishTime" : "2024-08-20T16:16:06.44Z" ,
"publish_time" : "2024-08-20T16:16:06.44Z"
} ,
"subscription" : "projects/degenalytics/subscriptions/allium_app.portal.solana.nonvoting_transactions.push"
}
You can read more about push subscriptions and what each fields mean in the Google Pub/Sub docs .
Step 4: Decompress the data
Data in Allium's streams are typically compressed for optimization reasons. We employ 3 different compression methods, depending on the shape, size and frequency of data coming through the stream:
The easiest way to determine which compression method was used, is to check the first few bytes of the data. Here are the bytes that the data will start with depending on the compression method used:
Here are sample scripts that will help you decompress the data if necessary:
Python JavaScript
Copy import gzip
import lz4 . frame
import zstandard
from google . cloud import pubsub_v1
GZIP_PREFIX_MAGIC_NUMBERS = b "\x1f\x8b"
ZSTD_PREFIX_MAGIC_NUMBERS = b "\x28\xb5\x2f\xfd"
LZ4_PREFIX_MAGIC_NUMBERS = b "\x04\x22\x4d\x18"
def auto_decompress ( data ):
if data . startswith (GZIP_PREFIX_MAGIC_NUMBERS):
try :
return gzip . decompress (data)
except OSError as e :
print ( f "Error decompressing gzip data: { e } " )
elif data . startswith (ZSTD_PREFIX_MAGIC_NUMBERS):
try :
decompressor = zstandard . ZstdDecompressor ()
stream_reader = decompressor . stream_reader (data)
decompressed = stream_reader . read ()
stream_reader . close ()
return decompressed
except Exception as e :
print ( f "Error decompressing zstd data: { e } " )
elif data . startswith (LZ4_PREFIX_MAGIC_NUMBERS):
try :
return lz4 . frame . decompress (data)
except lz4 . frame . LZ4FrameError as e :
print ( f "Error decompressing lz4 data: { e } " )
return data
def handle_message ( message : pubsub_v1 . subscriber . message . Message) -> None :
data = auto_decompress (message.data)
print ( f "Received data: {str (data) } " )
message . ack ()
Copy import * as zlib from 'zlib' ;
import * as lz4 from 'lz4' ;
import * as zstandard from '@fbneill/node-zstd' ;
import { PubSub } from '@google-cloud/pubsub' ;
const GZIP_PREFIX_MAGIC_NUMBERS = Buffer .from ([ 0x1f , 0x8b ]);
const ZSTD_PREFIX_MAGIC_NUMBERS = Buffer .from ([ 0x28 , 0xb5 , 0x2f , 0xfd ]);
const LZ4_PREFIX_MAGIC_NUMBERS = Buffer .from ([ 0x04 , 0x22 , 0x4d , 0x18 ]);
function autoDecompress (data : Buffer ) : Buffer {
if ( data .slice ( 0 , GZIP_PREFIX_MAGIC_NUMBERS . length ) .equals ( GZIP_PREFIX_MAGIC_NUMBERS )) {
try {
return zlib .gunzipSync (data);
} catch (e) {
console .error ( `Error decompressing gzip data: ${ e } ` );
}
} else if ( data .slice ( 0 , ZSTD_PREFIX_MAGIC_NUMBERS . length ) .equals ( ZSTD_PREFIX_MAGIC_NUMBERS )) {
try {
return zstandard .decompress (data);
} catch (e) {
console .error ( `Error decompressing zstd data: ${ e } ` );
}
} else if ( data .slice ( 0 , LZ4_PREFIX_MAGIC_NUMBERS . length ) .equals ( LZ4_PREFIX_MAGIC_NUMBERS )) {
try {
return lz4 .decode (data);
} catch (e) {
console .error ( `Error decompressing lz4 data: ${ e } ` );
}
}
return data;
}
function handleMessage (message : PubSub . Message ) : void {
const data = autoDecompress ( message .data);
console .log ( `Received data: ${ data .toString () } ` );
message .ack ();
}
Last updated 5 months ago