Skip to main content
Transform raw blockchain streams into actionable signals by applying custom filters and routing the results to your preferred destination.
Supported DestinationsWebhook endpoints, PubSub topics, Kafka topics

Why use Stream Transformations?

Selective Delivery

Receive only the events you care about—specific wallets, token transfers, or contract calls

Automated Pipelines

Data flows through filters into your systems without manual intervention

Dynamic Filters

Update filtering rules on the fly with immediate effect on running workflows

How It Works

With Allium Stream Transformations, operational complexity is removed—you define what data matters and where to send it.
Data transformation flow diagram
Check out the Datastream APIs for all the endpoints to manage your stream transformations programmatically.

Key Components

Example: Monitor Ethereum Wallet Activities

This example demonstrates how to monitor activities on 10 Ethereum wallet addresses.
1

Create a Filter Data Source

Create a data source to hold your list of Ethereum addresses.

Request

curl --location 'https://api.allium.so/api/v1/streams/data-management/filter-data-sources' \
--header 'X-API-KEY: <YOUR_ALLIUM_API_KEY>' \
--header 'Content-Type: application/json' \
--data '{
    "name": "Ethereum Addresses",
    "type": "string_array",
    "description": "Allowlist of Ethereum addresses for transactions we want to track."
}'

Response

{
    "id": "3123712-aqw1-asdg-0293-1f48914e1e3f",
    "name": "Ethereum Addresses",
    "type": "string_array",
    "description": "Allowlist of Ethereum addresses for transactions we want to track."
}
2

Add Values to Filter Data Source

Add the Ethereum addresses you want to track. Changes take effect immediately on running workflows.
  • Add Values
  • Delete Values
curl --location 'https://api.allium.so/api/v1/streams/data-management/filter-data-sources/3123712-aqw1-asdg-0293-1f48914e1e3f/values' \
--header 'X-API-KEY: <YOUR_ALLIUM_API_KEY>' \
--header 'Content-Type: application/json' \
--data '{
    "operation": "ADD",
    "values": [
        "0x0dfcac7b81015ea777f79d4e2f03980091ea0333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea1333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea2333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea3333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea4333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea5333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea6333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea7333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea8333",
        "0x0dfcac7b81015ea777f79d4e2f03980091ea9333"
    ]
}'
3

Create a Filter

Create a filter that applies your data source to the Ethereum transactions stream.
Filters using a data source must specify "type": "data_source" in the configuration.

Request

curl --location 'https://api.allium.so/api/v1/streams/data-management/filters' \
--header 'X-API-KEY: <YOUR_ALLIUM_API_KEY>' \
--header 'Content-Type: application/json' \
--data '{
    "filter": {
        "field": "address",
        "operator": "IN",
        "type": "data_source",
        "data_source_id": "3123712-aqw1-asdg-0293-1f48914e1e3f"
    }
}'

Response

{
    "id": "afas231k-das9-1232-als1-a89c32ca3c57",
    "filter": {
        "field": "address",
        "value": [
            "0x0dfcac7b81015ea777f79d4e2f03980091ea0333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea1333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea2333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea3333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea4333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea5333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea6333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea7333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea8333",
            "0x0dfcac7b81015ea777f79d4e2f03980091ea9333"
        ],
        "operator": "IN"
    }
}
4

Create a Workflow

Configure a workflow to publish filtered transactions to your webhook.

Request

curl --location 'https://api.allium.so/api/v1/streams/data-management/workflows' \
--header 'X-API-KEY: <YOUR_ALLIUM_API_KEY>' \
--header 'Content-Type: application/json' \
--data '{
    "description": "Ethereum transactions workflow",
    "filter_id": "afas231k-das9-1232-als1-a89c32ca3c57",
    "data_source_config": {
        "type": "PUBSUB",
        "topic": "ethereum.transactions"
    },
    "data_destination_config": {
        "type": "PUBSUB",
        "delivery_type": "PUSH",
        "webhook_url": "https://my-ethereum-transactions-webhook.a.run.app/"
    }
}'

Response

{
    "id": "50225675-69a6-4abc-bfdc-1ad0b1a223f9",
    "description": "Ethereum transactions workflow",
    "filter_id": "afas231k-das9-1232-als1-a89c32ca3c57",
    "data_source_config": {
        "type": "PUBSUB",
        "topic": "ethereum.transactions"
    },
    "data_destination_config": {
        "type": "PUBSUB",
        "topic": "allium.ethereum.transactions.data_transformation.abc56e30-0cfb-4998-804d-8fc2fe0065fc",
        "subscription": "allium_app.allium.ethereum.transactions.destination.data_transformation.abc56e30-0cfb-4998-804d-8fc2fe0065fc.push",
        "delivery_type": "PUSH",
        "webhook_url": "https://my-ethereum-transactions-webhook.a.run.app/"
    },
    "external_workflow_id": null,
    "status": "running"
}
5

Verify Workflow Status

Poll the workflow until external_workflow_id is non-null, indicating your webhook is receiving filtered transactions.

Request

curl --location 'https://api.allium.so/api/v1/streams/data-management/workflows/50225675-69a6-4abc-bfdc-1ad0b1a223f9' \
--header 'X-API-KEY: <YOUR_ALLIUM_API_KEY>'
I