With your infrastructure running and data lake designed, it’s time to connect to real data. In this chapter, we’ll implement the first step of our pipeline: downloading AIS (Automatic Identification System) data from public sources and staging it in the bronze layer.
Our Data Source: Danish Maritime Authority
The Danish Maritime Authority (DMA) provides free, public AIS data covering Danish waters, the Baltic Sea, and the North Sea. This data is available as daily compressed files—perfect for learning data engineering patterns with real maritime traffic.
Website: https://www.dma.dk/safety-at-sea/navigational-information/ais-data
Data Characteristics
| Property | Value |
|---|---|
| Coverage | Danish waters, Baltic Sea, North Sea |
| Update frequency | Daily |
| File format | ZIP archive containing CSV |
| File size | 100-500MB compressed per day |
| Historical data | Several years available |
| License | Public domain, free to use |
| Authentication | None required |
URL Pattern
DMA files follow a predictable URL pattern. Recent files are served from the root directory, while older files are organized into year subdirectories:
# Recent files (current month)
http://aisdata.ais.dk/aisdk-{YYYY}-{MM}-{DD}.zip
# Older files (archived by year)
http://aisdata.ais.dk/{YYYY}/aisdk-{YYYY}-{MM}-{DD}.zip
For example:
http://aisdata.ais.dk/aisdk-2025-01-28.zip
http://aisdata.ais.dk/2024/aisdk-2024-04-15.zip
Our downloader will try the root URL first, then fall back to the year subdirectory if the file returns 404.
What’s in the Data
Each CSV contains these columns:
| Column | Type | Description |
|---|---|---|
Timestamp | String | UTC timestamp: “2024-01-15T12:34:56” |
MMSI | Integer | Maritime Mobile Service Identity (9 digits) |
Latitude | Float | Decimal degrees (-90 to 90) |
Longitude | Float | Decimal degrees (-180 to 180) |
SOG | Float | Speed over ground (knots) |
COG | Float | Course over ground (degrees) |
Heading | Integer | True heading (degrees) |
Navigational status | String | ”Under way using engine”, “At anchor”, etc. |
Name | String | Vessel name |
Ship type | String | ”Cargo”, “Tanker”, “Passenger”, etc. |
Not all columns are populated for every message—position reports contain location and dynamics, while static data like vessel name comes from different AIS message types.
Building the Downloader
Before wiring up Dagster, we’ll build the core download logic as a standalone module. This keeps the code testable and reusable—our Dagster asset will simply call this downloader.
The Implementation
"""DMA AIS Data Downloader
Downloads daily AIS data from the Danish Maritime Authority
and uploads to MinIO bronze layer.
"""
import logging
from datetime import date
from typing import Optional
import requests
from minio import Minio
from minio.error import S3Error
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
logger = logging.getLogger(__name__)
class DMADownloader:
"""Downloads AIS data from Danish Maritime Authority."""
BASE_URL = "http://aisdata.ais.dk"
CHUNK_SIZE = 10 * 1024 * 1024 # 10MB chunks for multipart upload
def __init__(self, minio_client: Minio, bucket: str = "ais-lake"):
self.minio_client = minio_client
self.bucket = bucket
self._ensure_bucket_exists()
def _ensure_bucket_exists(self) -> None:
"""Create bucket if it doesn't exist."""
if not self.minio_client.bucket_exists(self.bucket):
self.minio_client.make_bucket(self.bucket)
def _build_url(self, target_date: date) -> str:
"""Build primary download URL (root directory for recent files)."""
filename = f"aisdk-{target_date.isoformat()}.zip"
return f"{self.BASE_URL}/{filename}"
def _build_fallback_url(self, target_date: date) -> str:
"""Build fallback URL (year subdirectory for older files)."""
filename = f"aisdk-{target_date.isoformat()}.zip"
return f"{self.BASE_URL}/{target_date.year}/{filename}"
def _build_object_path(self, target_date: date) -> str:
"""Build MinIO object path for bronze layer."""
filename = f"aisdk-{target_date.isoformat()}.zip"
return f"bronze/dma/{filename}"
def check_exists(self, target_date: date) -> bool:
"""Check if data for a date already exists in bronze layer."""
object_path = self._build_object_path(target_date)
try:
self.minio_client.stat_object(self.bucket, object_path)
return True
except S3Error as e:
if e.code == "NoSuchKey":
return False
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(requests.RequestException),
)
def download_day(
self, target_date: date, skip_if_exists: bool = True
) -> Optional[str]:
"""
Download AIS data for a specific date.
Args:
target_date: The date to download data for
skip_if_exists: Skip download if file already exists
Returns:
Object path in MinIO if successful, None if data not available
Raises:
requests.HTTPError: For non-404 HTTP errors after retries
S3Error: For MinIO upload failures
"""
object_path = self._build_object_path(target_date)
# Check if already downloaded (idempotency)
if skip_if_exists and self.check_exists(target_date):
logger.info(f"Skipping {target_date}, already exists: {object_path}")
return object_path
# Try primary URL first (root directory for recent files)
url = self._build_url(target_date)
logger.info(f"Downloading from: {url}")
response = requests.get(url, stream=True, timeout=300)
# If 404, try fallback URL (year subdirectory)
if response.status_code == 404:
response.close()
url = self._build_fallback_url(target_date)
logger.info(f"Trying fallback URL: {url}")
response = requests.get(url, stream=True, timeout=300)
# Still 404? Data not available
if response.status_code == 404:
response.close()
logger.info(f"No data available for {target_date}")
return None
response.raise_for_status()
# Log download size if available
content_length = response.headers.get("Content-Length")
if content_length:
logger.info(f"Downloading {int(content_length) / 1024 / 1024:.1f} MB")
try:
# Stream directly to MinIO without loading into memory
self.minio_client.put_object(
bucket_name=self.bucket,
object_name=object_path,
data=response.raw,
length=-1, # Unknown length, MinIO handles streaming
part_size=self.CHUNK_SIZE,
content_type="application/zip",
)
logger.info(f"Uploaded to: {object_path}")
return object_path
finally:
response.close()
Key Design Decisions
Three patterns make this downloader production-ready:
True streaming: The line data=response.raw with length=-1 is crucial. It streams data directly from HTTP to MinIO without buffering the entire 500MB file in memory. Many tutorials show response.content which loads everything into RAM first—that works for small files but fails at scale.
Smart retries: The retry_if_exception_type(requests.RequestException) decorator retries only on network failures (timeouts, connection drops), not on programming bugs like TypeError. Without this filter, a code error would retry forever before failing.
Idempotency: Checking skip_if_exists before downloading means you can safely re-run the pipeline after failures. Already-downloaded files are skipped, and the pipeline picks up where it left off.
Testing the Downloader
Before integrating with Dagster, verify the downloader works:
# Set environment variables
export MINIO_ENDPOINT="<your-minio-ip>:9000"
export MINIO_ACCESS_KEY="your-access-key"
export MINIO_SECRET_KEY="your-secret-key"
import os
from datetime import date, timedelta
from minio import Minio
from ingestion.downloaders.dma_downloader import DMADownloader
client = Minio(
endpoint=os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False,
)
downloader = DMADownloader(client)
# Download data from a week ago (should exist)
target_date = date.today() - timedelta(days=7)
object_path = downloader.download_day(target_date)
# Run again to verify idempotency (should skip)
downloader.download_day(target_date)
When backfilling historical data, be respectful of DMA’s servers—download sequentially and consider running large backfills during off-hours.
What You’ve Built
You now have a production-ready downloader that streams 500MB files without exhausting memory, retries intelligently on network failures, and supports safe re-runs through idempotency. The bronze layer stores original ZIP files exactly as received from DMA—we’ll extract and transform them in later chapters.
What’s Next
With data flowing into the bronze layer, we need to orchestrate these downloads automatically. Chapter 5 introduces Dagster’s asset-based approach and shows how to create pipeline assets that call this downloader on a schedule.
Need help building data ingestion pipelines for your organization? We specialize in production-grade data infrastructure. Get in touch to discuss your project.
Comments