A Beam pipeline has three components that data flows through in order:
source → transforms[] → sinks[]
Source
Beam sources connect to Allium’s Datastreams. Select a blockchain and entity type to stream.
{
"chain": "polygon",
"entity": "log",
"is_zerolag": false
}
| Field | Required | Description |
|---|
chain | Yes | Blockchain to source data from |
entity | Yes | Entity type to stream (see below) |
is_zerolag | No | Enable tip-of-chain mode to stream data from the absolute tip of the blockchain, before finality. Lower latency but may include data from reorged blocks. Default: false. |
Check the Datastreams catalog for supported chains and entities, or contact support@allium.so to request a specific chain or entity.
Two transform types are available. You can chain multiple transforms together — data flows through them in order.
Set filter
Filter data by matching field values against a set. Only records whose extracted field value exists in your set pass through.
{
"type": "redis_set_filter",
"set_values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359"],
"filter_expr": "root = this.address"
}
| Field | Description |
|---|
set_values | Array of values to match against |
filter_expr | Bloblang expression to extract the field value for filtering |
When filtering by addresses, labels, or symbols, use lowercase values. This is how these values are normalized in our system.
Bloblang filter_expr examples:
root = this.address # Filter by contract address
root = this.topic0 # Filter by event signature
root = this.from_address # Filter by sender address
root = this.to_address # Filter by recipient address
Log entity field reference:
| Field | Description |
|---|
address | Contract address that emitted the log |
topic0 | First topic (usually the event signature hash) |
topic1 | Second topic (first indexed parameter) |
topic2 | Third topic (second indexed parameter) |
topic3 | Fourth topic (third indexed parameter) |
from_address | Transaction sender |
to_address | Transaction recipient |
data | Non-indexed event data |
JavaScript (v8)
Transform data using JavaScript. Your function receives each record and can modify, enrich, or reshape it. Return null to drop a record.
{
"type": "v8",
"script": "function transform(record) { record.parsed = true; return record; }"
}
| Field | Description |
|---|
script | JavaScript code that processes each record |
Example — add a transfer size tag:
function transform(record) {
const amount = parseFloat(record.amount || "0");
if (amount > 1000000) {
record.size_tag = "whale";
} else if (amount > 10000) {
record.size_tag = "medium";
} else {
record.size_tag = "small";
}
return record;
}
Sinks
Sinks define where your processed data is delivered.
{
"type": "kafka",
"name": "my-output-topic"
}
| Field | Description |
|---|
type | Output type: kafka or sns |
name | Topic name suffix for the output |
Kafka sinks — Topic naming: beam.{config_id}.{name}. After deployment, you receive connection credentials and consumer code snippets in Python and TypeScript. See our Kafka integration guide for more on consuming Kafka data.
SNS sinks — Topic naming: beam-{config_id}-{name}. See our SNS integration guide for details on consuming SNS data.
More sinks coming soon: Pub/Sub, ClickHouse, Postgres, GCS, NATS, RabbitMQ. Contact support@allium.so if you need a specific sink type.
Best practices
- Filter early — apply set filters before JavaScript transforms to reduce data volume
- Start simple — begin with basic filtering, add transforms incrementally
- Monitor after deploy — always check worker health after deployment
- Test transforms — validate JavaScript scripts handle edge cases (missing fields, unexpected types)
- Use descriptive names — clear pipeline and sink names help track multiple pipelines
- Lowercase addresses — always use lowercase when filtering by addresses, labels, or symbols
- Redeploy, don’t teardown — when updating a pipeline, just redeploy. It’s idempotent with zero downtime.
Troubleshooting
Unhealthy workers
If workers show as unhealthy:
- Check that the pipeline config is valid (source chain/entity exists, filter expressions are syntactically correct)
- Verify source data is available on the selected chain and entity
- Update the config and redeploy (no teardown needed)
OOM killed workers
If workers are being killed for out-of-memory:
- Simplify JavaScript transforms to reduce memory usage
- Add more aggressive filtering earlier in the pipeline to reduce data volume
- Contact support@allium.so for resource limit adjustments
Crashing workers
If workers are in a crash loop:
- Check JavaScript scripts for syntax errors
- Verify filter expressions are valid Bloblang
- Review transform logic for runtime errors (e.g., accessing undefined properties)