Traditional workflow orchestrators like Airflow think in terms of tasks—actions to perform in sequence. Dagster takes a different approach: it focuses on data assets, the actual data products your pipeline produces. This shift fundamentally changes how you design, test, and maintain data pipelines.

Why Assets Matter

Consider the difference:

Task-based (traditional):

download_task >> process_task >> validate_task

What data does process_task produce? Where is it stored? You have to read the code to find out.

Asset-based (Dagster):

@asset
def silver_ais_data(bronze_ais_data):
    """Produces silver/ais/dt=2024-01-15/*.parquet"""
    return transform(bronze_ais_data)

The asset declaration tells you exactly what data exists, what it depends on, and where to find it. The asset graph becomes your documentation.

Benefits of Asset-Based Design

Data is the product: You care about the Parquet files in your data lake, not the task that created them.

Self-documenting: The asset graph shows data lineage visually. New team members can understand the pipeline by exploring Dagit.

Testable: Assets are Python functions. You can unit test them without complex mocking.

Incremental: Re-materialize only changed assets. Dagster tracks what’s up-to-date.

Partition-aware: Process historical data with backfills—Dagster handles parallelization and failure recovery.

Core Concepts

Assets

An asset is a data product your pipeline produces:

from dagster import asset

@asset
def dma_ais_bronze(context):
    """Download DMA AIS data to bronze layer.

    Produces: s3://ais-lake/bronze/dma/aisdk-2024-01-15.csv
    """
    # Download logic here
    return "bronze/dma/aisdk-2024-01-15.csv"

Key points:

  • Asset name = function name (dma_ais_bronze)
  • Docstring explains what it produces
  • Return value is typically a path or metadata
  • context provides runtime information

Dependencies

Assets depend on other assets, forming a lineage graph:

@asset
def bronze_data():
    """Raw CSV from source."""
    return download_csv()

@asset
def silver_data(bronze_data):  # Depends on bronze_data
    """Cleaned Parquet from CSV."""
    return convert_to_parquet(bronze_data)

@asset
def gold_data(silver_data):  # Depends on silver_data
    """Curated dataset."""
    return aggregate(silver_data)

Dagster automatically builds the dependency graph:

bronze_data → silver_data → gold_data

When you materialize gold_data, Dagster materializes dependencies first if they’re missing.

Partitions

Partitions let you process data incrementally:

from dagster import asset, DailyPartitionsDefinition

daily_partition = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partition)
def daily_ais_data(context):
    """Download AIS data for a specific day."""
    partition_date = context.partition_key  # e.g., "2024-01-15"
    return download_day(partition_date)

Benefits:

  • Incremental processing: Only process new days
  • Backfills: Fill in historical data with one command
  • Parallelization: Process multiple partitions concurrently
  • Recovery: Re-run failed partitions without touching successful ones

Resources

Resources are external services your assets need:

from dagster import resource
from minio import Minio

@resource(
    config_schema={
        "endpoint": str,
        "access_key": str,
        "secret_key": str,
    }
)
def minio_resource(context):
    """MinIO S3-compatible storage client."""
    return Minio(
        endpoint=context.resource_config["endpoint"],
        access_key=context.resource_config["access_key"],
        secret_key=context.resource_config["secret_key"],
        secure=False
    )

Using resources in assets:

@asset(required_resource_keys={"minio_client"})
def my_asset(context):
    client = context.resources.minio_client
    # Use client to upload/download

Why use resources?

  • Configuration: Different configs for dev/prod
  • Testing: Mock resources in tests
  • Lifecycle: Open connections once, reuse across assets

Building Your First Asset

Let’s create an asset that downloads AIS data.

Step 1: Define Partitions

# partitions.py
from dagster import DailyPartitionsDefinition

daily_partition = DailyPartitionsDefinition(
    start_date="2024-01-01"
)

Step 2: Create MinIO Resource

# resources/minio_resource.py
from dagster import resource
from minio import Minio

@resource(
    config_schema={
        "endpoint": str,
        "access_key": str,
        "secret_key": str,
        "secure": bool,
    }
)
def minio_resource(context):
    """MinIO S3-compatible storage client."""
    return Minio(
        endpoint=context.resource_config["endpoint"],
        access_key=context.resource_config["access_key"],
        secret_key=context.resource_config["secret_key"],
        secure=context.resource_config["secure"],
    )

Step 3: Create Download Asset

# assets/ingestion.py
from dagster import asset
from datetime import datetime
from ingestion.downloaders import DMADownloader
from ..partitions import daily_partition

@asset(
    partitions_def=daily_partition,
    required_resource_keys={"minio_client"},
    description="Download DMA AIS data for a specific day to bronze layer"
)
def dma_ais_bronze(context):
    """Download DMA AIS CSV to bronze staging area.

    Partition: Daily (one file per day)
    Output: s3://ais-lake/bronze/dma/aisdk-YYYY-MM-DD.csv
    """
    partition_date_str = context.partition_key
    partition_date = datetime.strptime(partition_date_str, "%Y-%m-%d").date()

    minio_client = context.resources.minio_client
    downloader = DMADownloader(minio_client, bucket="ais-lake")
    object_path = downloader.download_day(partition_date)

    context.log.info(f"Downloaded DMA data to: {object_path}")
    return object_path

Step 4: Create Definitions

# repository.py
from dagster import Definitions, load_assets_from_modules
from . import assets
from .resources import minio_resource
import os

all_assets = load_assets_from_modules([assets])

resources = {
    "minio_client": minio_resource.configured({
        "endpoint": os.getenv("MINIO_ENDPOINT", "minio:9000"),
        "access_key": os.getenv("MINIO_ACCESS_KEY"),
        "secret_key": os.getenv("MINIO_SECRET_KEY"),
        "secure": False,
    })
}

defs = Definitions(
    assets=all_assets,
    resources=resources,
)

Testing Locally

Before deploying to Kubernetes, always test locally.

Start Dagit

# Set environment variables
export MINIO_ENDPOINT="192.168.1.240:9000"
export MINIO_ACCESS_KEY="your-access-key"
export MINIO_SECRET_KEY="your-secret-key"

# Start Dagster UI
dagster dev -m src.pipeline.repository

Open http://localhost:3000 in your browser.

Materialize an Asset

In Dagit:

  1. Click Assets in the sidebar
  2. Select dma_ais_bronze
  3. Click Materialize
  4. Select a partition (e.g., 2024-01-15)
  5. Click Launch Run

Watch the logs to see the download progress, then verify the file exists in MinIO.

Asset Dependencies in Action

Create a dependent asset that converts bronze to silver:

# assets/bronze_to_silver.py
from dagster import asset, AssetIn
from ..partitions import daily_partition

@asset(
    partitions_def=daily_partition,
    required_resource_keys={"minio_client", "duckdb_conn"},
    ins={"dma_ais_bronze": AssetIn()},
    description="Convert DMA bronze CSV to silver Parquet"
)
def dma_ais_silver(context, dma_ais_bronze):
    """Convert DMA CSV to partitioned Parquet.

    Input: s3://ais-lake/bronze/dma/aisdk-YYYY-MM-DD.csv
    Output: s3://ais-lake/silver/ais/source=dma/dt=YYYY-MM-DD/*.parquet
    """
    partition_date = context.partition_key

    # Convert using DuckDB
    duckdb_conn = context.resources.duckdb_conn
    # ... conversion logic

    return f"silver/ais/source=dma/dt={partition_date}/"

Now when you materialize dma_ais_silver, Dagster will:

  1. Check if dma_ais_bronze exists for that partition
  2. If not, materialize dma_ais_bronze first
  3. Then materialize dma_ais_silver

Backfilling Historical Data

One of Dagster’s most powerful features is backfilling.

Via Dagit

  1. Go to Assetsdma_ais_bronze
  2. Click MaterializeLaunch backfill
  3. Select date range: 2024-01-01 to 2024-01-31
  4. Click Launch

Dagster creates 31 runs, executes them in parallel (respecting concurrency limits), tracks progress, and allows retrying failed partitions.

Via Command Line

dagster asset backfill \
  --module src.pipeline.repository \
  --asset-key dma_ais_bronze \
  --from 2024-01-01 \
  --to 2024-01-31

Adding Observability

Asset Metadata

Attach metadata to track what your assets produce:

from dagster import Output, MetadataValue

@asset
def my_asset(context):
    result = process_data()

    return Output(
        value=result,
        metadata={
            "num_rows": MetadataValue.int(len(result)),
            "file_size_mb": MetadataValue.float(result.size / 1024 / 1024),
        }
    )

Metadata appears in Dagit next to the asset, making it easy to spot anomalies.

Asset Checks

Add data quality checks:

from dagster import asset_check, AssetCheckResult

@asset_check(asset=dma_ais_silver)
def check_row_count(context):
    """Ensure we have at least 10k rows per day."""
    row_count = get_row_count(context.partition_key)

    return AssetCheckResult(
        passed=row_count >= 10_000,
        metadata={"row_count": row_count}
    )

Best Practices

Keep Assets Pure

Assets should be deterministic and idempotent:

# Bad - uses current time
@asset
def bad_asset():
    return f"data_{datetime.now()}.csv"

# Good - uses partition key
@asset(partitions_def=daily_partition)
def good_asset(context):
    return f"data_{context.partition_key}.csv"

Use Descriptive Names

Asset names should describe what they produce, not how:

  • Bad: download_and_process_task
  • Good: dma_ais_silver

Partition Thoughtfully

Partition when:

  • Data is naturally incremental (daily ingestion)
  • Processing can be parallelized
  • You need backfill capability

Don’t partition when:

  • Partitions would be tiny (<100MB)
  • Dependencies exist between partitions
  • Data isn’t naturally incremental

Test Before Deploying

Always:

  1. Run dagster dev -m src.pipeline.repository
  2. Materialize a single partition
  3. Verify output in MinIO
  4. Then deploy to Kubernetes

What You’ve Learned

Dagster’s asset-based model makes data pipelines:

  • Easier to understand: The asset graph shows data lineage
  • Easier to test: Assets are just Python functions
  • Easier to maintain: Dependencies are explicit
  • Easier to operate: Backfills and retries are built in

The key insight: focus on the data products you’re creating, not the tasks that create them. Let Dagster handle orchestration while you define what data should exist.

What’s Next

Now that you understand Dagster fundamentals, Chapter 6 covers the bronze-to-silver transformation—converting raw CSV files to optimized Parquet using DuckDB.


Building data pipelines for your organization? We help companies implement production-grade orchestration. Get in touch to discuss your project.