How to Structure a DAG for Raster Processing

To structure a DAG for raster processing, decompose the workflow into four deterministic stages: metadata validation, spatial partitioning (tiling/chunking), parallelized pixel computation, and consolidated export. Each stage must operate as an isolated task with strict I/O contracts, leveraging dynamic task mapping for chunk-level parallelism while maintaining CRS consistency and deterministic tile indexing. This architecture prevents memory exhaustion, enables idempotent retries, and aligns with established DAG Design Principles for Spatial ETL by treating raster boundaries as explicit state transitions rather than implicit assumptions.

The Four-Stage Deterministic Pipeline

Raster processing fails when monolithic tasks attempt to load full scenes into memory or when downstream tasks assume uniform tile alignment. A production-ready DAG must enforce:

  1. Metadata Gatekeeping: Extract CRS, resolution, bounding box, and band count before any partitioning. Reject mismatched projections early to prevent silent geospatial drift.
  2. Deterministic Grid Generation: Compute row/column windows using fixed tile sizes (e.g., 1024×1024 or 2048×2048). Store window coordinates as task parameters to guarantee identical retry behavior and cache-friendly execution.
  3. Dynamic Mapping: Spawn worker tasks per tile using orchestration-native mapping. Avoid manual for loops in the flow layer; let the scheduler distribute work across available executors.
  4. Seamless Aggregation: Use a merge/reduce task that respects window transforms, handles nodata padding, and writes a single output with consistent compression and tiling.

This pattern scales across cloud runners, Kubernetes pods, or HPC schedulers. For deeper architectural context on state management and spatial partitioning, reference the Geospatial Orchestration Architecture & Fundamentals pillar.

Production Implementation (Prefect 2.x)

The following snippet demonstrates a complete, retry-safe DAG using Prefect 2.x dynamic mapping and rasterio windowed I/O. It avoids loading full datasets into RAM by streaming tiles through isolated workers.

from pathlib import Path
from typing import List, Tuple, Dict

import numpy as np
import rasterio
from rasterio.windows import Window
from rasterio.merge import merge
from prefect import flow, task, unmapped

@task(retries=2, retry_delay_seconds=10)
def extract_raster_metadata(src_path: str) -> Dict:
    """Gatekeeping stage: validate and extract core raster properties."""
    with rasterio.open(src_path) as src:
        return {
            "crs": src.crs.to_string(),
            "transform": src.transform,
            "width": src.width,
            "height": src.height,
            "count": src.count,
            "dtype": str(src.dtypes[0]),
            "nodata": src.nodata,
        }

@task(retries=1)
def generate_tile_grid(meta: Dict, tile_size: int = 1024) -> List[Tuple[int, int, int, int]]:
    """Deterministic grid generation: (col_off, row_off, width, height) per tile."""
    grid: List[Tuple[int, int, int, int]] = []
    for r in range(0, meta["height"], tile_size):
        for c in range(0, meta["width"], tile_size):
            w = min(tile_size, meta["width"] - c)
            h = min(tile_size, meta["height"] - r)
            grid.append((c, r, w, h))
    return grid

@task(retries=2, retry_delay_seconds=5)
def process_tile(src_path: str, window_coords: Tuple[int, int, int, int], out_dir: str) -> str:
    """Parallelized computation: read window, apply transform, write tile."""
    col_off, row_off, width, height = window_coords
    tile_window = Window(col_off, row_off, width, height)

    out_path = Path(out_dir) / f"tile_{row_off}_{col_off}.tif"

    with rasterio.open(src_path) as src:
        data = src.read(window=tile_window)
        # Example: simple band math or normalization
        processed = np.clip(data.astype(np.float32) / 255.0, 0, 1)

        profile = src.profile.copy()
        profile.update({
            "driver": "GTiff",
            "dtype": "float32",
            "width": width,
            "height": height,
            "transform": rasterio.windows.transform(tile_window, src.transform),
            "compress": "deflate",
            "tiled": True,
            "blockxsize": 256,
            "blockysize": 256,
        })

        with rasterio.open(out_path, "w", **profile) as dst:
            dst.write(processed)

    return str(out_path)

@task(retries=1)
def consolidate_tiles(tile_paths: List[str], output_path: str) -> str:
    """Aggregation: merge processed tiles into a single geotiff."""
    datasets = [rasterio.open(p) for p in tile_paths]
    try:
        merged_data, merged_transform = merge(datasets)

        profile = datasets[0].profile.copy()
        profile.update({
            "height": merged_data.shape[1],
            "width": merged_data.shape[2],
            "transform": merged_transform,
            "compress": "lzw",
            "tiled": True,
        })

        with rasterio.open(output_path, "w", **profile) as dst:
            dst.write(merged_data)
    finally:
        for ds in datasets:
            ds.close()

    return output_path

@flow(log_prints=True)
def raster_processing_dag(src_path: str, output_path: str, tile_size: int = 1024) -> str:
    meta = extract_raster_metadata(src_path)
    grid = generate_tile_grid(meta, tile_size)

    temp_dir = Path("temp_tiles")
    temp_dir.mkdir(exist_ok=True)

    # Dynamic mapping: spawns one task per window coordinate.
    # `unmapped` keeps scalar args fixed across the mapped invocations.
    tile_futures = process_tile.map(
        src_path=unmapped(src_path),
        window_coords=grid,
        out_dir=unmapped(str(temp_dir)),
    )

    # Resolving each future blocks until the corresponding tile is written.
    completed_paths = [f.result() for f in tile_futures]

    final_output = consolidate_tiles(completed_paths, output_path)
    print(f"Pipeline complete: {final_output}")
    return final_output

if __name__ == "__main__":
    raster_processing_dag("input.tif", "output_processed.tif")

Engineering Guardrails & Scaling Patterns

Memory & I/O Contracts

Never pass raw arrays between tasks. Serialize only file paths, window coordinates, and lightweight metadata dictionaries. Rasterio’s windowed reading documentation confirms that streaming chunks via Window objects prevents OOM failures on multi-gigabyte scenes. Always enforce tiled=True and blockxsize/blockysize in output profiles to optimize downstream cloud storage reads.

Idempotency & Retry Safety

Deterministic grid generation ensures that a retried tile always writes to the same output path. Combine this with orchestration-level retries and exponential backoff to handle transient cloud storage throttling. Avoid side effects in the flow layer; keep all state mutations inside tasks. Prefect’s dynamic task mapping guide explicitly recommends mapping over parameter lists rather than iterating inside flows to preserve DAG visibility and retry isolation.

CRS & Nodata Consistency

Validate that all input rasters share the same EPSG code before partitioning. If your pipeline ingests multi-source data, inject a reprojection task between metadata extraction and grid generation. Always propagate the source nodata value through the merge stage; failing to mask nodata during aggregation creates artificial seams and corrupts statistical outputs.

Horizontal Scaling

This DAG architecture is executor-agnostic. Deploy it on Kubernetes via KubernetesJob blocks, scale across AWS Batch queues, or run on HPC schedulers using DaskTaskRunner. The bottleneck is rarely compute; it is disk I/O and network egress. Place intermediate tiles on high-throughput NVMe volumes or cloud object storage with multipart upload enabled. For enterprise deployments, consult the broader Geospatial Orchestration Architecture & Fundamentals pillar to align task routing, credential rotation, and observability pipelines.