With your infrastructure running and data lake designed, it’s time to connect to real data. In this chapter, we’ll implement the first step of our pipeline: downloading AIS (Automatic Identification System) data from public sources and staging it in the bronze layer.

Our Data Source: Danish Maritime Authority

The Danish Maritime Authority (DMA) provides free, public AIS data covering Danish waters, the Baltic Sea, and the North Sea. This data is available as daily compressed files—perfect for learning data engineering patterns with real maritime traffic.

Website: https://www.dma.dk/safety-at-sea/navigational-information/ais-data

Data Characteristics

PropertyValue
CoverageDanish waters, Baltic Sea, North Sea
Update frequencyDaily
File formatZIP archive containing CSV
File size100-500MB compressed per day
Historical dataSeveral years available
LicensePublic domain, free to use
AuthenticationNone required

URL Pattern

DMA files follow a predictable URL pattern. Recent files are served from the root directory, while older files are organized into year subdirectories:

# Recent files (current month)
http://aisdata.ais.dk/aisdk-{YYYY}-{MM}-{DD}.zip

# Older files (archived by year)
http://aisdata.ais.dk/{YYYY}/aisdk-{YYYY}-{MM}-{DD}.zip

For example:

http://aisdata.ais.dk/aisdk-2025-01-28.zip
http://aisdata.ais.dk/2024/aisdk-2024-04-15.zip

Our downloader will try the root URL first, then fall back to the year subdirectory if the file returns 404.

What’s in the Data

Each CSV contains these columns:

ColumnTypeDescription
TimestampStringUTC timestamp: “2024-01-15T12:34:56”
MMSIIntegerMaritime Mobile Service Identity (9 digits)
LatitudeFloatDecimal degrees (-90 to 90)
LongitudeFloatDecimal degrees (-180 to 180)
SOGFloatSpeed over ground (knots)
COGFloatCourse over ground (degrees)
HeadingIntegerTrue heading (degrees)
Navigational statusString”Under way using engine”, “At anchor”, etc.
NameStringVessel name
Ship typeString”Cargo”, “Tanker”, “Passenger”, etc.

Not all columns are populated for every message—position reports contain location and dynamics, while static data like vessel name comes from different AIS message types.

Building the Downloader

Before wiring up Dagster, we’ll build the core download logic as a standalone module. This keeps the code testable and reusable—our Dagster asset will simply call this downloader.

The Implementation

"""DMA AIS Data Downloader

Downloads daily AIS data from the Danish Maritime Authority
and uploads to MinIO bronze layer.
"""

import logging
from datetime import date
from typing import Optional

import requests
from minio import Minio
from minio.error import S3Error
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

logger = logging.getLogger(__name__)


class DMADownloader:
    """Downloads AIS data from Danish Maritime Authority."""

    BASE_URL = "http://aisdata.ais.dk"
    CHUNK_SIZE = 10 * 1024 * 1024  # 10MB chunks for multipart upload

    def __init__(self, minio_client: Minio, bucket: str = "ais-lake"):
        self.minio_client = minio_client
        self.bucket = bucket
        self._ensure_bucket_exists()

    def _ensure_bucket_exists(self) -> None:
        """Create bucket if it doesn't exist."""
        if not self.minio_client.bucket_exists(self.bucket):
            self.minio_client.make_bucket(self.bucket)

    def _build_url(self, target_date: date) -> str:
        """Build primary download URL (root directory for recent files)."""
        filename = f"aisdk-{target_date.isoformat()}.zip"
        return f"{self.BASE_URL}/{filename}"

    def _build_fallback_url(self, target_date: date) -> str:
        """Build fallback URL (year subdirectory for older files)."""
        filename = f"aisdk-{target_date.isoformat()}.zip"
        return f"{self.BASE_URL}/{target_date.year}/{filename}"

    def _build_object_path(self, target_date: date) -> str:
        """Build MinIO object path for bronze layer."""
        filename = f"aisdk-{target_date.isoformat()}.zip"
        return f"bronze/dma/{filename}"

    def check_exists(self, target_date: date) -> bool:
        """Check if data for a date already exists in bronze layer."""
        object_path = self._build_object_path(target_date)
        try:
            self.minio_client.stat_object(self.bucket, object_path)
            return True
        except S3Error as e:
            if e.code == "NoSuchKey":
                return False
            raise

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        retry=retry_if_exception_type(requests.RequestException),
    )
    def download_day(
        self, target_date: date, skip_if_exists: bool = True
    ) -> Optional[str]:
        """
        Download AIS data for a specific date.

        Args:
            target_date: The date to download data for
            skip_if_exists: Skip download if file already exists

        Returns:
            Object path in MinIO if successful, None if data not available

        Raises:
            requests.HTTPError: For non-404 HTTP errors after retries
            S3Error: For MinIO upload failures
        """
        object_path = self._build_object_path(target_date)

        # Check if already downloaded (idempotency)
        if skip_if_exists and self.check_exists(target_date):
            logger.info(f"Skipping {target_date}, already exists: {object_path}")
            return object_path

        # Try primary URL first (root directory for recent files)
        url = self._build_url(target_date)
        logger.info(f"Downloading from: {url}")

        response = requests.get(url, stream=True, timeout=300)

        # If 404, try fallback URL (year subdirectory)
        if response.status_code == 404:
            response.close()
            url = self._build_fallback_url(target_date)
            logger.info(f"Trying fallback URL: {url}")
            response = requests.get(url, stream=True, timeout=300)

        # Still 404? Data not available
        if response.status_code == 404:
            response.close()
            logger.info(f"No data available for {target_date}")
            return None

        response.raise_for_status()

        # Log download size if available
        content_length = response.headers.get("Content-Length")
        if content_length:
            logger.info(f"Downloading {int(content_length) / 1024 / 1024:.1f} MB")

        try:
            # Stream directly to MinIO without loading into memory
            self.minio_client.put_object(
                bucket_name=self.bucket,
                object_name=object_path,
                data=response.raw,
                length=-1,  # Unknown length, MinIO handles streaming
                part_size=self.CHUNK_SIZE,
                content_type="application/zip",
            )

            logger.info(f"Uploaded to: {object_path}")
            return object_path

        finally:
            response.close()

Key Design Decisions

Three patterns make this downloader production-ready:

True streaming: The line data=response.raw with length=-1 is crucial. It streams data directly from HTTP to MinIO without buffering the entire 500MB file in memory. Many tutorials show response.content which loads everything into RAM first—that works for small files but fails at scale.

Smart retries: The retry_if_exception_type(requests.RequestException) decorator retries only on network failures (timeouts, connection drops), not on programming bugs like TypeError. Without this filter, a code error would retry forever before failing.

Idempotency: Checking skip_if_exists before downloading means you can safely re-run the pipeline after failures. Already-downloaded files are skipped, and the pipeline picks up where it left off.

Testing the Downloader

Before integrating with Dagster, verify the downloader works:

# Set environment variables
export MINIO_ENDPOINT="<your-minio-ip>:9000"
export MINIO_ACCESS_KEY="your-access-key"
export MINIO_SECRET_KEY="your-secret-key"
import os
from datetime import date, timedelta
from minio import Minio
from ingestion.downloaders.dma_downloader import DMADownloader

client = Minio(
    endpoint=os.getenv("MINIO_ENDPOINT"),
    access_key=os.getenv("MINIO_ACCESS_KEY"),
    secret_key=os.getenv("MINIO_SECRET_KEY"),
    secure=False,
)

downloader = DMADownloader(client)

# Download data from a week ago (should exist)
target_date = date.today() - timedelta(days=7)
object_path = downloader.download_day(target_date)

# Run again to verify idempotency (should skip)
downloader.download_day(target_date)

When backfilling historical data, be respectful of DMA’s servers—download sequentially and consider running large backfills during off-hours.

What You’ve Built

You now have a production-ready downloader that streams 500MB files without exhausting memory, retries intelligently on network failures, and supports safe re-runs through idempotency. The bronze layer stores original ZIP files exactly as received from DMA—we’ll extract and transform them in later chapters.

What’s Next

With data flowing into the bronze layer, we need to orchestrate these downloads automatically. Chapter 5 introduces Dagster’s asset-based approach and shows how to create pipeline assets that call this downloader on a schedule.


Need help building data ingestion pipelines for your organization? We specialize in production-grade data infrastructure. Get in touch to discuss your project.