Integrating Allium’s Kafka data streams into your systems allows you to access real-time blockchain data seamlessly with high throughput and low latency.
Python Example
This example demonstrates how to consume messages from Allium’s Kafka streams using Python.
Install the Confluent Library
Consumer Script
Create and execute the following script to start consuming messages:
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException
consumer = Consumer(
{
"bootstrap.servers": "Your Bootstrap Servers",
"group.id": "Your Group ID",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": "Your Username",
"sasl.password": "Your Password",
"auto.offset.reset": "latest",
}
)
def msg_process(msg):
print(msg.value())
# Process your message here
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
finally:
# Close down consumer to commit final offsets
consumer.close()
# Subscribe to your desired topic
consume_loop(consumer, ["hyperliquid.misc_events"])
Replace the placeholder values ("Your Bootstrap Servers", "Your Group ID", etc.) with the actual credentials provided by Allium.
Processing MessagesUncomment and customize the msg_process(msg) function call to implement your own message processing logic.