Skip to content

Data Layers

Understanding how Dango organizes data across schemas.


Overview

Dango organizes data into four layers, following modern analytics engineering best practices:

raw → staging → intermediate → marts

Each layer has a specific purpose and transformation logic:

Layer Schema Purpose Managed By Materialization
Raw raw_{source}.* Source data as-loaded dlt Tables
Staging staging.* Cleaned data Auto-generated by Dango Tables
Intermediate intermediate.* Reusable business logic You (dbt) Tables
Marts marts.* Final metrics for BI You (dbt) Tables

Raw Layer

Purpose

Store data exactly as it comes from the source, with zero transformations.

Schema Naming

All sources use the raw_{source_name} naming convention:

raw_{source_name}.{table_name}

Examples

-- CSV source named "orders"
SELECT * FROM raw_orders.orders;

-- Stripe source named "stripe_payments"
SELECT * FROM raw_stripe_payments.charges;
SELECT * FROM raw_stripe_payments.customers;
SELECT * FROM raw_stripe_payments.subscriptions;

-- HubSpot source named "hubspot_crm"
SELECT * FROM raw_hubspot_crm.contacts;
SELECT * FROM raw_hubspot_crm.companies;
SELECT * FROM raw_hubspot_crm.deals;

dlt Metadata Columns

Every raw table includes dlt tracking columns:

Column Type Purpose
_dlt_load_id TEXT Unique ID for each pipeline run
_dlt_extracted_at TIMESTAMP When dlt extracted this row
_dlt_id TEXT Unique row identifier

Example:

SELECT
    id,
    customer_id,
    amount,
    _dlt_load_id,
    _dlt_extracted_at
FROM raw_stripe_payments.charges
LIMIT 5;

dlt Internal Tables

In addition to your data tables, each raw schema contains internal tables used by dlt for tracking:

Table Purpose
_dlt_loads Records each pipeline run (load ID, status, timestamp). Use this to audit when data was loaded.
_dlt_pipeline_state Stores pipeline state for incremental loading (cursors, last values). Do not modify this table.
_dlt_version Schema version tracking for automatic schema evolution.

These tables are managed automatically. You can query _dlt_loads to check when the last successful sync occurred:

SELECT schema_name, status, inserted_at
FROM raw_stripe._dlt_loads
ORDER BY inserted_at DESC
LIMIT 5;

Data Loading

Raw tables store data as-loaded from sources: - dlt handles incremental loading where supported - Use dango sync --full-refresh to reload all data if needed - Deduplication is applied during ingestion (see below)


Deduplication Strategies

Dango supports four deduplication strategies that control how duplicate records are handled during ingestion. Configure this per source in sources.yml:

sources:
  - name: orders
    type: csv
    csv:
      deduplication_strategy: latest_only
      primary_key: order_id
      timestamp_column: updated_at

Strategy Reference

Strategy Behavior Use When
none No deduplication. All records are kept, including duplicates. Data is pre-deduplicated or duplicates are acceptable.
latest_only Keeps only the most recent record per primary key (based on timestamp column). You want current state only (e.g., latest customer info).
append_only Appends all new records. No updates or deletes. You need a complete event log (e.g., transactions, clicks).
scd_type2 Slowly Changing Dimension Type 2. Tracks historical changes with valid_from/valid_to columns. You need full change history (e.g., price changes, status transitions).

Choosing a strategy

  • Start with latest_only for most dimension tables (customers, products)
  • Use append_only for event/fact tables (orders, page views)
  • Use scd_type2 only when you need to query historical state at a point in time
  • Use none for one-time loads or pre-processed data

Schema Naming Conventions

Raw schemas follow the pattern raw_{source_name}:

  • Source named stripe → schema raw_stripe
  • Source named my_google_sheets → schema raw_my_google_sheets

Table names within the schema come from the source (e.g., the Stripe source creates charges, customers, subscriptions).


Staging Layer

Purpose

Create clean, consistently-structured tables from raw data suitable for analysis.

Auto-Generation

Staging models are auto-generated by Dango during sync:

dango sync
# Loads data AND generates staging models

This creates:

dbt/models/staging/
├── stg_stripe_charges.sql
├── stg_stripe_customers.sql
├── stg_stripe_subscriptions.sql
├── _stg_stripe__sources.yml      # Documents raw tables
└── _stg_stripe__schema.yml       # Column descriptions

Staging Model Contents

Auto-generated staging models provide: - Clean SELECT * from raw tables - Source documentation (YAML files) - Column descriptions

You can customize staging models by editing the generated SQL files. Remove the auto-generated comment at the top to prevent Dango from overwriting your changes.

Materialization

Staging models are materialized as tables (not views): - Better performance with Metabase queries - Consistent query results - Easier debugging


Intermediate Layer

Purpose

Reusable business logic that combines multiple staging tables.

User-Created

You write these models manually in dbt/models/intermediate/:

-- dbt/models/intermediate/int_customer_lifetime_value.sql
{{ config(materialized='table') }}

WITH charges AS (
    SELECT
        customer_id,
        SUM(amount) as total_spent,
        COUNT(*) as order_count,
        MIN(created) as first_order_date,
        MAX(created) as last_order_date
    FROM {{ ref('stg_stripe_charges') }}
    WHERE status = 'succeeded'
    GROUP BY customer_id
)

SELECT * FROM charges

When to Use

Use intermediate models for: - Complex joins across multiple tables - Aggregations used in multiple marts - Business logic shared across reports - Performance optimization (pre-compute expensive operations)

Example: Customer Segmentation

-- dbt/models/intermediate/int_customer_segments.sql
{{ config(materialized='table') }}

WITH ltv AS (
    SELECT * FROM {{ ref('int_customer_lifetime_value') }}
),

activity AS (
    SELECT
        customer_id,
        DATEDIFF('day', last_order_date, CURRENT_DATE) as days_since_last_order
    FROM ltv
)

SELECT
    customer_id,
    total_spent,
    order_count,
    days_since_last_order,
    CASE
        WHEN total_spent > 10000 THEN 'VIP'
        WHEN total_spent > 1000 THEN 'Premium'
        WHEN days_since_last_order > 90 THEN 'At Risk'
        ELSE 'Standard'
    END as customer_segment
FROM activity

Marts Layer

Purpose

Final, analytics-ready tables optimized for BI tools and reporting.

User-Created

You write these models in dbt/models/marts/:

-- dbt/models/marts/customer_metrics.sql
{{ config(materialized='table') }}

WITH customers AS (
    SELECT * FROM {{ ref('stg_stripe_customers') }}
),

ltv AS (
    SELECT * FROM {{ ref('int_customer_lifetime_value') }}
),

segments AS (
    SELECT * FROM {{ ref('int_customer_segments') }}
)

SELECT
    c.id,
    c.email,
    c.name,
    c.created as customer_since,
    ltv.total_spent,
    ltv.order_count,
    ltv.first_order_date,
    ltv.last_order_date,
    segments.customer_segment,
    segments.days_since_last_order
FROM customers c
LEFT JOIN ltv ON c.id = ltv.customer_id
LEFT JOIN segments ON c.id = segments.customer_id

Design Guidelines

Denormalized for BI: - One wide table per business area - Pre-joined dimensions - Pre-calculated metrics - Optimized for SELECT queries

Materialized as tables: - Faster query performance in Metabase - More storage, but worth it for end-users

Examples of mart tables: - customer_metrics - Customer analytics - daily_sales - Sales performance - product_performance - Product analytics - churn_analysis - Retention metrics


Complete Example: Stripe Data Flow

Let's trace how Stripe payment data flows through all layers:

1. Raw Layer

dlt loads data:

dango sync stripe_payments

Tables created:

-- Raw charges
SELECT * FROM raw_stripe_payments.charges LIMIT 3;

Output:

id          | customer_id | amount | currency | created    | _dlt_load_id
------------|-------------|--------|----------|------------|-------------
ch_001      | cus_A       | 1000   | usd      | 2024-01-15 | 1734567890
ch_002      | cus_B       | 2500   | usd      | 2024-01-16 | 1734567890
ch_003      | cus_A       | 500    | usd      | 2024-01-17 | 1734567999

2. Staging Layer

Staging models auto-generated during sync:

SELECT * FROM staging.stg_stripe_payments__charges LIMIT 3;

Output:

id     | customer_id | amount | currency | created
-------|-------------|--------|----------|------------
ch_001 | cus_A       | 1000   | usd      | 2024-01-15
ch_002 | cus_B       | 2500   | usd      | 2024-01-16
ch_003 | cus_A       | 500    | usd      | 2024-01-17

3. Intermediate Layer

Create reusable logic:

-- dbt/models/intermediate/int_customer_ltv.sql
SELECT
    customer_id,
    SUM(amount) / 100.0 as total_spent_usd,
    COUNT(*) as purchase_count
FROM {{ ref('stg_stripe_charges') }}
GROUP BY customer_id

Result:

SELECT * FROM intermediate.int_customer_ltv;

Output:

customer_id | total_spent_usd | purchase_count
------------|-----------------|---------------
cus_A       | 15.00           | 2
cus_B       | 25.00           | 1

4. Marts Layer

Build final metrics table:

-- dbt/models/marts/customer_metrics.sql
WITH customers AS (
    SELECT * FROM {{ ref('stg_stripe_customers') }}
),

ltv AS (
    SELECT * FROM {{ ref('int_customer_ltv') }}
)

SELECT
    c.id,
    c.email,
    c.created as customer_since,
    COALESCE(ltv.total_spent_usd, 0) as lifetime_value,
    COALESCE(ltv.purchase_count, 0) as total_purchases
FROM customers c
LEFT JOIN ltv ON c.id = ltv.customer_id

Query in Metabase:

SELECT * FROM marts.customer_metrics
ORDER BY lifetime_value DESC
LIMIT 10;

Output:

id    | email          | customer_since | lifetime_value | total_purchases
------|----------------|----------------|----------------|----------------
cus_B | [email protected]   | 2024-01-10     | 25.00          | 1
cus_A | [email protected] | 2024-01-05     | 15.00          | 2


Schema Evolution

dlt Sources (API-based)

When a source API adds a new field, dlt auto-detects it:

Before:

SELECT id, email FROM raw_stripe_payments.customers;

After API update:

SELECT id, email, phone FROM raw_stripe_payments.customers;
-- 'phone' column automatically added by dlt

Run dango sync again to update staging models with new columns.

CSV Sources

CSV sources require consistent schemas: - New columns in CSV files will cause a validation error - To add columns, update all CSV files to include the new columns - Or use --full-refresh to reload with new schema

Handling Schema Changes

Best practices: 1. Run dango sync to get new data and update staging models 2. Update intermediate/marts models manually if needed 3. Run dango run to apply transformations


Querying Guidelines

Query Staging or Marts?

Use Case Query Layer Reason
Ad-hoc exploration staging.* Fast, simple, all columns
Production dashboards marts.* Optimized, pre-joined
Data quality checks raw_{source}.* See original source data
Debugging pipelines raw_{source}.* + _dlt_* columns Audit trail

Example: Debugging

-- Check if data is landing in raw
SELECT COUNT(*), MAX(_dlt_extracted_at)
FROM raw_stripe_payments.charges;

-- Compare raw vs staging row counts
SELECT 'raw' as layer, COUNT(*) as rows
FROM raw_stripe_payments.charges
UNION ALL
SELECT 'staging', COUNT(*)
FROM staging.stg_stripe_payments__charges;

Next Steps