Integrating Allium’s Kafka data streams into your systems allows you to access real-time data seamlessly.

Python Example

Install the confluent library

UV

uv add confluent_kafka

PIP

pip install confluent_kafka

Create the following script and execute it

import sys
from confluent_kafka import Consumer, KafkaError, KafkaException

consumer = Consumer(
    {
        "bootstrap.servers": "Your Bootstrap Servers",
        "group.id": "Your Group ID",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "PLAIN",
        "sasl.username": "Your Username",
        "sasl.password": "Your Password",
        "auto.offset.reset": "latest",
    }
)

def msg_process(msg):
    print(msg.value())
    # do stuff

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # msg_process(msg)
                pass
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

consume_loop(consumer, ["hyperliquid.misc_events"])