Integrating Allium’s Kafka data streams into your systems allows you to access real-time data seamlessly.
Python Example
Install the confluent library
UV
PIP
pip install confluent_kafka
Create the following script and execute it
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException
consumer = Consumer(
{
"bootstrap.servers": "Your Bootstrap Servers",
"group.id": "Your Group ID",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": "Your Username",
"sasl.password": "Your Password",
"auto.offset.reset": "latest",
}
)
def msg_process(msg):
print(msg.value())
# do stuff
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while True:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
# msg_process(msg)
pass
finally:
# Close down consumer to commit final offsets.
consumer.close()
consume_loop(consumer, ["hyperliquid.misc_events"])