> ## Documentation Index
> Fetch the complete documentation index at: https://docs.allium.so/llms.txt
> Use this file to discover all available pages before exploring further.

# Apache Kafka

> Integrate Allium's realtime data streams via Apache Kafka.

Integrating Allium's Kafka data streams into your systems allows you to access realtime blockchain data seamlessly with high throughput and low latency.

## Python Example

This example demonstrates how to consume messages from Allium's Kafka streams using Python.

### Install the Confluent Library

<Tabs>
  <Tab title="uv">
    ```bash theme={null}
    uv add confluent_kafka
    ```
  </Tab>

  <Tab title="pip">
    ```bash theme={null}
    pip install confluent_kafka
    ```
  </Tab>
</Tabs>

### Consumer Script

Create and execute the following script to start consuming messages:

```python theme={null}
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException

consumer = Consumer(
    {
        "bootstrap.servers": "Your Bootstrap Servers",
        "group.id": "Your Group ID",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "PLAIN",
        "sasl.username": "Your Username",
        "sasl.password": "Your Password",
        "auto.offset.reset": "latest",
    }
)

def msg_process(msg):
    print(msg.value())
    # Process your message here

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets
        consumer.close()

# Subscribe to your desired topic
consume_loop(consumer, ["hyperliquid.misc_events"])
```

<Note>
  Replace the placeholder values (`"Your Bootstrap Servers"`, `"Your Group ID"`,
  etc.) with the actual credentials provided by Allium.
</Note>

<Tip>
  **Processing Messages**

  Uncomment and customize the `msg_process(msg)` function call to implement your own message processing logic.
</Tip>
