Skip to main content
Integrating Allium’s Kafka data streams into your systems allows you to access realtime blockchain data seamlessly with high throughput and low latency.

Python Example

This example demonstrates how to consume messages from Allium’s Kafka streams using Python.

Install the Confluent Library

  • uv
  • pip
uv add confluent_kafka

Consumer Script

Create and execute the following script to start consuming messages:
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException

consumer = Consumer(
    {
        "bootstrap.servers": "Your Bootstrap Servers",
        "group.id": "Your Group ID",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "PLAIN",
        "sasl.username": "Your Username",
        "sasl.password": "Your Password",
        "auto.offset.reset": "latest",
    }
)

def msg_process(msg):
    print(msg.value())
    # Process your message here

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets
        consumer.close()

# Subscribe to your desired topic
consume_loop(consumer, ["hyperliquid.misc_events"])
Replace the placeholder values ("Your Bootstrap Servers", "Your Group ID", etc.) with the actual credentials provided by Allium.
Processing MessagesUncomment and customize the msg_process(msg) function call to implement your own message processing logic.