uv add confluent_kafka
pip install confluent_kafka
import sys from confluent_kafka import Consumer, KafkaError, KafkaException consumer = Consumer( { "bootstrap.servers": "Your Bootstrap Servers", "group.id": "Your Group ID", "security.protocol": "SASL_SSL", "sasl.mechanisms": "PLAIN", "sasl.username": "Your Username", "sasl.password": "Your Password", "auto.offset.reset": "latest", } ) def msg_process(msg): print(msg.value()) # do stuff def consume_loop(consumer, topics): try: consumer.subscribe(topics) while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: # msg_process(msg) pass finally: # Close down consumer to commit final offsets. consumer.close() consume_loop(consumer, ["hyperliquid.misc_events"])