Skip to main content
A Beam pipeline has three components that data flows through in order:
source → transforms[] → sinks[]

Source

Beam sources connect to Allium’s Datastreams. Select a blockchain and entity type to stream.
{
  "chain": "polygon",
  "entity": "log",
  "is_zerolag": false
}
FieldRequiredDescription
chainYesBlockchain to source data from
entityYesEntity type to stream (see below)
is_zerolagNoEnable tip-of-chain mode to stream data from the absolute tip of the blockchain, before finality. Lower latency but may include data from reorged blocks. Default: false.
Check the Datastreams catalog for supported chains and entities, or contact support@allium.so to request a specific chain or entity.

Transforms

Two transform types are available. You can chain multiple transforms together — data flows through them in order.

Set filter

Filter data by matching field values against a set. Only records whose extracted field value exists in your set pass through.
{
  "type": "redis_set_filter",
  "set_values": ["0x3c499c542cef5e3811e1192ce70d8cc03d5c3359"],
  "filter_expr": "root = this.address"
}
FieldDescription
set_valuesArray of values to match against
filter_exprBloblang expression to extract the field value for filtering
When filtering by addresses, labels, or symbols, use lowercase values. This is how these values are normalized in our system.
Bloblang filter_expr examples:
root = this.address        # Filter by contract address
root = this.topic0         # Filter by event signature
root = this.from_address   # Filter by sender address
root = this.to_address     # Filter by recipient address
Log entity field reference:
FieldDescription
addressContract address that emitted the log
topic0First topic (usually the event signature hash)
topic1Second topic (first indexed parameter)
topic2Third topic (second indexed parameter)
topic3Fourth topic (third indexed parameter)
from_addressTransaction sender
to_addressTransaction recipient
dataNon-indexed event data

JavaScript (v8)

Transform data using JavaScript. Your function receives each record and can modify, enrich, or reshape it. Return null to drop a record.
{
  "type": "v8",
  "script": "function transform(record) { record.parsed = true; return record; }"
}
FieldDescription
scriptJavaScript code that processes each record
Example — add a transfer size tag:
function transform(record) {
  const amount = parseFloat(record.amount || "0");
  if (amount > 1000000) {
    record.size_tag = "whale";
  } else if (amount > 10000) {
    record.size_tag = "medium";
  } else {
    record.size_tag = "small";
  }
  return record;
}

Sinks

Sinks define where your processed data is delivered.
{
  "type": "kafka",
  "name": "my-output-topic"
}
FieldDescription
typeOutput type: kafka or sns
nameTopic name suffix for the output
Kafka sinks — Topic naming: beam.{config_id}.{name}. After deployment, you receive connection credentials and consumer code snippets in Python and TypeScript. See our Kafka integration guide for more on consuming Kafka data. SNS sinks — Topic naming: beam-{config_id}-{name}. See our SNS integration guide for details on consuming SNS data.
More sinks coming soon: Pub/Sub, ClickHouse, Postgres, GCS, NATS, RabbitMQ. Contact support@allium.so if you need a specific sink type.

Best practices

  1. Filter early — apply set filters before JavaScript transforms to reduce data volume
  2. Start simple — begin with basic filtering, add transforms incrementally
  3. Monitor after deploy — always check worker health after deployment
  4. Test transforms — validate JavaScript scripts handle edge cases (missing fields, unexpected types)
  5. Use descriptive names — clear pipeline and sink names help track multiple pipelines
  6. Lowercase addresses — always use lowercase when filtering by addresses, labels, or symbols
  7. Redeploy, don’t teardown — when updating a pipeline, just redeploy. It’s idempotent with zero downtime.

Troubleshooting

Unhealthy workers

If workers show as unhealthy:
  1. Check that the pipeline config is valid (source chain/entity exists, filter expressions are syntactically correct)
  2. Verify source data is available on the selected chain and entity
  3. Update the config and redeploy (no teardown needed)

OOM killed workers

If workers are being killed for out-of-memory:
  1. Simplify JavaScript transforms to reduce memory usage
  2. Add more aggressive filtering earlier in the pipeline to reduce data volume
  3. Contact support@allium.so for resource limit adjustments

Crashing workers

If workers are in a crash loop:
  1. Check JavaScript scripts for syntax errors
  2. Verify filter expressions are valid Bloblang
  3. Review transform logic for runtime errors (e.g., accessing undefined properties)