> ## Documentation Index
> Fetch the complete documentation index at: https://docs.allium.so/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Integration Guide

> Build a reliable sync pipeline using Allium's delivery metadata tables

When ingesting Allium blockchain data into your own warehouse (e.g., as a dbt source), a key challenge is knowing **what data was updated** and **when**. Blockchain data is not strictly append-only — late-arriving records, reorgs, and backfills mean that records with older `block_timestamp` values can be delivered after newer ones.

Allium provides **delivery metadata tables** that give you full transparency into every data update, enabling you to build a reliable and cost-efficient sync pipeline.

## Key Concepts

| Term              | Definition                                                                                                                                                    |
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `block_timestamp` | The on-chain timestamp of a record — when the event happened on the blockchain.                                                                               |
| **Snapshot**      | A versioned point-in-time view of a table. Each snapshot may contain new records, backfilled records, or both. Snapshots are created roughly every hour.      |
| **Interval**      | An hourly slice of `block_timestamp` (e.g., all records where `block_timestamp` falls within `2026-01-12 05:00`(inclusive) to `2026-01-12 06:00`(exclusive)). |

<Warning>
  `block_timestamp` does not always increase monotonically with delivery time. A record delivered today could have a `block_timestamp` from last week (due to a backfill or patch). The metadata tables make this visible so your pipeline can handle it correctly.
</Warning>

## Metadata Tables

### `delivery_metadata.snapshots.aggregated`

Shows the latest state of each snapshot delivered, including what data intervals were loaded in each snapshot. This is the primary table for **steady-state syncing**.

| Column                                        | Type            | Description                                                                     |
| --------------------------------------------- | --------------- | ------------------------------------------------------------------------------- |
| `chain`                                       | `VARCHAR`       | Blockchain name (e.g., `ethereum`, `base`)                                      |
| `table_name`                                  | `VARCHAR`       | Table name (e.g., `raw.logs`, `raw.transactions`)                               |
| `watermark_column`                            | `VARCHAR`       | The timestamp column used for watermarking (typically `block_timestamp`)        |
| `watermark_level`                             | `TIMESTAMP_NTZ` | The high-water mark of the table at this snapshot                               |
| `delivery_interval`                           | `VARCHAR`       | The delivery cadence for this table                                             |
| `snapshot_id`                                 | `VARCHAR`       | Unique snapshot version identifier                                              |
| `is_full_refresh`                             | `BOOLEAN`       | Whether the dataset was fully refreshed during this snapshot                    |
| `snapshot_loaded_intervals__count`            | `NUMBER`        | Number of records loaded in this snapshot                                       |
| `snapshot_loaded_intervals__minutes`          | `FLOAT`         | Total minutes of `block_timestamp` coverage loaded                              |
| `snapshot_loaded_intervals__min_timestamp`    | `TIMESTAMP_NTZ` | Earliest `block_timestamp` of records loaded in this snapshot                   |
| `snapshot_loaded_intervals__max_timestamp`    | `TIMESTAMP_NTZ` | Latest `block_timestamp` of records loaded in this snapshot                     |
| `snapshot_loaded_intervals__merged_intervals` | `VARCHAR`       | Consolidated time ranges of loaded data                                         |
| `snapshot_loaded_intervals__loads`            | `VARCHAR`       | JSON array of individual data loads within the snapshot, with exact time ranges |
| `snapshot_created_at`                         | `TIMESTAMP_NTZ` | When this snapshot was created and became available to you                      |
| `created_at`                                  | `TIMESTAMP_NTZ` | When this metadata record was created                                           |

Each row represents a single snapshot. The `min_timestamp` and `max_timestamp` tell you the overall time range of data that was loaded. Note that this can be a sparse interval — the `loads` and `merged_intervals` fields provide the exact sub-ranges if you need finer granularity.

### `delivery_metadata.intervals.changes`

Shows, for each hourly partition of data, when it was last updated. This is the primary table for **backfill detection**.

| Column                        | Type            | Description                                                                                                                                                   |
| ----------------------------- | --------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `chain`                       | `VARCHAR`       | Blockchain name                                                                                                                                               |
| `table_name`                  | `VARCHAR`       | Table name                                                                                                                                                    |
| `hour`                        | `TIMESTAMP_NTZ` | The hourly partition (`date_trunc('hour', block_timestamp)`). One row = one hour of data where `block_timestamp >= hour AND block_timestamp < hour + 1 hour`. |
| `last_updated_at`             | `TIMESTAMP_NTZ` | The most recent time this hour's data was modified                                                                                                            |
| `updated_timestamps`          | `VARCHAR`       | List of timestamps when this hour's data was updated                                                                                                          |
| `last_full_refresh_timestamp` | `TIMESTAMP_NTZ` | The last time this table was fully refreshed                                                                                                                  |
| `max_hours_late`              | `FLOAT`         | Maximum hours between `hour` and when the data was actually delivered — useful for measuring backfill lag                                                     |
| `_created_at`                 | `TIMESTAMP_NTZ` | When this metadata record was created                                                                                                                         |
| `_updated_at`                 | `TIMESTAMP_NTZ` | When this metadata record was last updated                                                                                                                    |

For example, if `base.raw.logs` has `hour = 2025-11-06 02:00` with `last_updated_at = 2026-02-20`, that means the hourly partition for Nov 6 2AM was last updated (e.g., patched/backfilled) on Feb 20.

## Recommended Integration Pattern

For full data alignment with Allium, you need **two jobs**:

1. A **steady-state job** (high cadence) — handles new data at the tip of chain
2. A **backfill job** (low cadence) — catches late-arriving historical patches

```mermaid theme={null}
flowchart TD
    A["Allium Data Delivery"] --> B["Delivery Metadata Tables"]

    subgraph B["Delivery Metadata Tables"]
        B1["<b>snapshots.aggregated</b><br/>What data was delivered<br/>in each snapshot, and when?"]
        B2["<b>intervals.changes</b><br/>Which hourly partitions<br/>were updated, and when?"]
    end

    B1 --> C1
    B2 --> C2

    subgraph C["Your Pipeline"]
        C1["<b>Job 1: Steady-state</b> (hourly)<br/>Query snapshots.aggregated for new data<br/>Pull block_timestamp range from source<br/>Cap lookback to e.g. 24–36 hours"]
        C2["<b>Job 2: Backfill</b> (daily/weekly)<br/>Query intervals.changes for late patches<br/>Re-sync affected hourly partitions<br/>Check for full refreshes"]
    end
```

### Job 1: Steady-State Sync

Runs every hour (or at whatever cadence your pipeline operates). Picks up newly delivered data snapshots.

Query `delivery_metadata.snapshots.aggregated` to find snapshots created since your last run:

```sql theme={null}
SELECT
    snapshot_id,
    snapshot_created_at,
    snapshot_loaded_intervals__min_timestamp,
    snapshot_loaded_intervals__max_timestamp,
    snapshot_loaded_intervals__loads,
    is_full_refresh
FROM delivery_metadata.snapshots.aggregated
WHERE chain = 'base'
  AND table_name = 'raw.logs'
  AND snapshot_created_at >= <last_pipeline_run_time> - INTERVAL '1 hour'
```

Then use the returned `min_timestamp` / `max_timestamp` to determine the `block_timestamp` range to pull from the source table.

<Tip>
  To keep scan costs predictable, cap the minimum time to a lookback window (e.g., only pull data from the last 24–36 hours). Any data changes older than that window will be caught by the backfill job.
</Tip>

### Job 2: Backfill Sync

Runs daily or weekly. Catches any data that was patched or backfilled for historical time periods — records too old for the steady-state lookback window to catch.

Query `delivery_metadata.intervals.changes` to find hourly partitions that were recently updated:

```sql theme={null}
SELECT
    table_name,
    hour,
    last_updated_at,
    last_full_refresh_timestamp
FROM delivery_metadata.intervals.changes
WHERE table_name = 'base.raw.logs'
  AND last_updated_at >= CURRENT_TIMESTAMP - INTERVAL '1 day'
ORDER BY hour
```

The returned `hour` values are the partitions you need to re-sync. Compare `last_updated_at` against your own internal tracking timestamp to determine which hours actually need patching.

You can also check `last_full_refresh_timestamp` to detect if a full table refresh occurred — compare it against your internal timestamp to know if a full re-ingestion is needed.

## How the Two Jobs Work Together

**Steady-state** optimizes for speed and cost — it handles the common case of new data arriving at the leading edge. **Backfill** ensures full correctness by catching the uncommon case of late-arriving historical patches (data from weeks or months ago being corrected).

Together, these two jobs ensure your warehouse stays fully aligned with Allium.

## dbt Integration Example

If you use dbt with Allium as a source, here's how you might structure your incremental model:

```sql theme={null}
-- models/staging/stg_base_raw_logs.sql
{{ config(
    materialized='incremental',
    unique_key=['block_number', 'transaction_index', 'log_index'],
    incremental_strategy='merge'
) }}

SELECT *
FROM {{ source('allium', 'base_raw_logs') }}

{% if is_incremental() %}
WHERE block_timestamp >= (
    SELECT MIN(snapshot_loaded_intervals__min_timestamp)
    FROM {{ source('allium_delivery_metadata', 'snapshots_aggregated') }}
    WHERE chain = 'base'
      AND table_name = 'base.raw.logs'
      AND snapshot_created_at >= (
          SELECT MAX(_loaded_at) FROM {{ this }}
      ) - INTERVAL '1 hour'
)
{% endif %}
```

For the backfill job, create a separate model or macro that queries `intervals.changes` and re-processes the affected partitions.

## Further Reading

* [Metadata columns](/historical-data/overview/data-faq/metadata-columns)
