Skip to main content
Integrating Allium’s Kafka data streams into your systems allows you to access real-time blockchain data seamlessly with high throughput and low latency.

Python Example

This example demonstrates how to consume messages from Allium’s Kafka streams using Python.

Install the Confluent Library

  • uv
  • pip
uv add confluent_kafka

Consumer Script

Create and execute the following script to start consuming messages:
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException

consumer = Consumer(
    {
        "bootstrap.servers": "Your Bootstrap Servers",
        "group.id": "Your Group ID",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "PLAIN",
        "sasl.username": "Your Username",
        "sasl.password": "Your Password",
        "auto.offset.reset": "latest",
    }
)

def msg_process(msg):
    print(msg.value())
    # Process your message here

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets
        consumer.close()

# Subscribe to your desired topic
consume_loop(consumer, ["hyperliquid.misc_events"])
Replace the placeholder values ("Your Bootstrap Servers", "Your Group ID", etc.) with the actual credentials provided by Allium.
Processing MessagesUncomment and customize the msg_process(msg) function call to implement your own message processing logic.
I