Traditional workflow orchestrators like Airflow think in terms of tasks—actions to perform in sequence. Dagster takes a different approach: it focuses on data assets, the actual data products your pipeline produces. This shift fundamentally changes how you design, test, and maintain data pipelines.
Why Assets Matter
Consider the difference:
Task-based (traditional):
download_task >> process_task >> validate_task
What data does process_task produce? Where is it stored? You have to read the code to find out.
Asset-based (Dagster):
@asset
def silver_ais_data(bronze_ais_data):
"""Produces silver/ais/dt=2024-01-15/*.parquet"""
return transform(bronze_ais_data)
The asset declaration tells you exactly what data exists, what it depends on, and where to find it. The asset graph becomes your documentation.
Benefits of Asset-Based Design
Data is the product: You care about the Parquet files in your data lake, not the task that created them.
Self-documenting: The asset graph shows data lineage visually. New team members can understand the pipeline by exploring Dagit.
Testable: Assets are Python functions. You can unit test them without complex mocking.
Incremental: Re-materialize only changed assets. Dagster tracks what’s up-to-date.
Partition-aware: Process historical data with backfills—Dagster handles parallelization and failure recovery.
Core Concepts
Assets
An asset is a data product your pipeline produces:
from dagster import asset
@asset
def dma_ais_bronze(context):
"""Download DMA AIS data to bronze layer.
Produces: s3://ais-lake/bronze/dma/aisdk-2024-01-15.csv
"""
# Download logic here
return "bronze/dma/aisdk-2024-01-15.csv"
Key points:
- Asset name = function name (
dma_ais_bronze) - Docstring explains what it produces
- Return value is typically a path or metadata
contextprovides runtime information
Dependencies
Assets depend on other assets, forming a lineage graph:
@asset
def bronze_data():
"""Raw CSV from source."""
return download_csv()
@asset
def silver_data(bronze_data): # Depends on bronze_data
"""Cleaned Parquet from CSV."""
return convert_to_parquet(bronze_data)
@asset
def gold_data(silver_data): # Depends on silver_data
"""Curated dataset."""
return aggregate(silver_data)
Dagster automatically builds the dependency graph:
bronze_data → silver_data → gold_data
When you materialize gold_data, Dagster materializes dependencies first if they’re missing.
Partitions
Partitions let you process data incrementally:
from dagster import asset, DailyPartitionsDefinition
daily_partition = DailyPartitionsDefinition(start_date="2024-01-01")
@asset(partitions_def=daily_partition)
def daily_ais_data(context):
"""Download AIS data for a specific day."""
partition_date = context.partition_key # e.g., "2024-01-15"
return download_day(partition_date)
Benefits:
- Incremental processing: Only process new days
- Backfills: Fill in historical data with one command
- Parallelization: Process multiple partitions concurrently
- Recovery: Re-run failed partitions without touching successful ones
Resources
Resources are external services your assets need:
from dagster import resource
from minio import Minio
@resource(
config_schema={
"endpoint": str,
"access_key": str,
"secret_key": str,
}
)
def minio_resource(context):
"""MinIO S3-compatible storage client."""
return Minio(
endpoint=context.resource_config["endpoint"],
access_key=context.resource_config["access_key"],
secret_key=context.resource_config["secret_key"],
secure=False
)
Using resources in assets:
@asset(required_resource_keys={"minio_client"})
def my_asset(context):
client = context.resources.minio_client
# Use client to upload/download
Why use resources?
- Configuration: Different configs for dev/prod
- Testing: Mock resources in tests
- Lifecycle: Open connections once, reuse across assets
Building Your First Asset
Let’s create an asset that downloads AIS data.
Step 1: Define Partitions
# partitions.py
from dagster import DailyPartitionsDefinition
daily_partition = DailyPartitionsDefinition(
start_date="2024-01-01"
)
Step 2: Create MinIO Resource
# resources/minio_resource.py
from dagster import resource
from minio import Minio
@resource(
config_schema={
"endpoint": str,
"access_key": str,
"secret_key": str,
"secure": bool,
}
)
def minio_resource(context):
"""MinIO S3-compatible storage client."""
return Minio(
endpoint=context.resource_config["endpoint"],
access_key=context.resource_config["access_key"],
secret_key=context.resource_config["secret_key"],
secure=context.resource_config["secure"],
)
Step 3: Create Download Asset
# assets/ingestion.py
from dagster import asset
from datetime import datetime
from ingestion.downloaders import DMADownloader
from ..partitions import daily_partition
@asset(
partitions_def=daily_partition,
required_resource_keys={"minio_client"},
description="Download DMA AIS data for a specific day to bronze layer"
)
def dma_ais_bronze(context):
"""Download DMA AIS CSV to bronze staging area.
Partition: Daily (one file per day)
Output: s3://ais-lake/bronze/dma/aisdk-YYYY-MM-DD.csv
"""
partition_date_str = context.partition_key
partition_date = datetime.strptime(partition_date_str, "%Y-%m-%d").date()
minio_client = context.resources.minio_client
downloader = DMADownloader(minio_client, bucket="ais-lake")
object_path = downloader.download_day(partition_date)
context.log.info(f"Downloaded DMA data to: {object_path}")
return object_path
Step 4: Create Definitions
# repository.py
from dagster import Definitions, load_assets_from_modules
from . import assets
from .resources import minio_resource
import os
all_assets = load_assets_from_modules([assets])
resources = {
"minio_client": minio_resource.configured({
"endpoint": os.getenv("MINIO_ENDPOINT", "minio:9000"),
"access_key": os.getenv("MINIO_ACCESS_KEY"),
"secret_key": os.getenv("MINIO_SECRET_KEY"),
"secure": False,
})
}
defs = Definitions(
assets=all_assets,
resources=resources,
)
Testing Locally
Before deploying to Kubernetes, always test locally.
Start Dagit
# Set environment variables
export MINIO_ENDPOINT="192.168.1.240:9000"
export MINIO_ACCESS_KEY="your-access-key"
export MINIO_SECRET_KEY="your-secret-key"
# Start Dagster UI
dagster dev -m src.pipeline.repository
Open http://localhost:3000 in your browser.
Materialize an Asset
In Dagit:
- Click Assets in the sidebar
- Select
dma_ais_bronze - Click Materialize
- Select a partition (e.g.,
2024-01-15) - Click Launch Run
Watch the logs to see the download progress, then verify the file exists in MinIO.
Asset Dependencies in Action
Create a dependent asset that converts bronze to silver:
# assets/bronze_to_silver.py
from dagster import asset, AssetIn
from ..partitions import daily_partition
@asset(
partitions_def=daily_partition,
required_resource_keys={"minio_client", "duckdb_conn"},
ins={"dma_ais_bronze": AssetIn()},
description="Convert DMA bronze CSV to silver Parquet"
)
def dma_ais_silver(context, dma_ais_bronze):
"""Convert DMA CSV to partitioned Parquet.
Input: s3://ais-lake/bronze/dma/aisdk-YYYY-MM-DD.csv
Output: s3://ais-lake/silver/ais/source=dma/dt=YYYY-MM-DD/*.parquet
"""
partition_date = context.partition_key
# Convert using DuckDB
duckdb_conn = context.resources.duckdb_conn
# ... conversion logic
return f"silver/ais/source=dma/dt={partition_date}/"
Now when you materialize dma_ais_silver, Dagster will:
- Check if
dma_ais_bronzeexists for that partition - If not, materialize
dma_ais_bronzefirst - Then materialize
dma_ais_silver
Backfilling Historical Data
One of Dagster’s most powerful features is backfilling.
Via Dagit
- Go to Assets →
dma_ais_bronze - Click Materialize → Launch backfill
- Select date range:
2024-01-01to2024-01-31 - Click Launch
Dagster creates 31 runs, executes them in parallel (respecting concurrency limits), tracks progress, and allows retrying failed partitions.
Via Command Line
dagster asset backfill \
--module src.pipeline.repository \
--asset-key dma_ais_bronze \
--from 2024-01-01 \
--to 2024-01-31
Adding Observability
Asset Metadata
Attach metadata to track what your assets produce:
from dagster import Output, MetadataValue
@asset
def my_asset(context):
result = process_data()
return Output(
value=result,
metadata={
"num_rows": MetadataValue.int(len(result)),
"file_size_mb": MetadataValue.float(result.size / 1024 / 1024),
}
)
Metadata appears in Dagit next to the asset, making it easy to spot anomalies.
Asset Checks
Add data quality checks:
from dagster import asset_check, AssetCheckResult
@asset_check(asset=dma_ais_silver)
def check_row_count(context):
"""Ensure we have at least 10k rows per day."""
row_count = get_row_count(context.partition_key)
return AssetCheckResult(
passed=row_count >= 10_000,
metadata={"row_count": row_count}
)
Best Practices
Keep Assets Pure
Assets should be deterministic and idempotent:
# Bad - uses current time
@asset
def bad_asset():
return f"data_{datetime.now()}.csv"
# Good - uses partition key
@asset(partitions_def=daily_partition)
def good_asset(context):
return f"data_{context.partition_key}.csv"
Use Descriptive Names
Asset names should describe what they produce, not how:
- Bad:
download_and_process_task - Good:
dma_ais_silver
Partition Thoughtfully
Partition when:
- Data is naturally incremental (daily ingestion)
- Processing can be parallelized
- You need backfill capability
Don’t partition when:
- Partitions would be tiny (<100MB)
- Dependencies exist between partitions
- Data isn’t naturally incremental
Test Before Deploying
Always:
- Run
dagster dev -m src.pipeline.repository - Materialize a single partition
- Verify output in MinIO
- Then deploy to Kubernetes
What You’ve Learned
Dagster’s asset-based model makes data pipelines:
- Easier to understand: The asset graph shows data lineage
- Easier to test: Assets are just Python functions
- Easier to maintain: Dependencies are explicit
- Easier to operate: Backfills and retries are built in
The key insight: focus on the data products you’re creating, not the tasks that create them. Let Dagster handle orchestration while you define what data should exist.
What’s Next
Now that you understand Dagster fundamentals, Chapter 6 covers the bronze-to-silver transformation—converting raw CSV files to optimized Parquet using DuckDB.
Building data pipelines for your organization? We help companies implement production-grade orchestration. Get in touch to discuss your project.
Comments