Skip to content

Async ingestion and process jobs: progress reporting, retry, and resume #89

@turban

Description

@turban

Problem

Ingesting large historical time ranges is slow, fragile, and opaque:

  • Downloads run synchronously — POST /ingestions blocks until complete, timing out HTTP connections on multi-year backfills
  • A transient network error aborts the entire ingestion with no automatic retry
  • There is no visibility into progress or estimated completion time
  • The same problem exists for POST /processes/{id}/execution — both share the same blocking dispatch pattern

Real-world scale

ERA5-Land, Norway, 1990–present (~1 month/minute, 35-year backfill takes ~7 hours, fails mid-way on transient ContentLengthError from DestinE):

Per variable Both variables
Intermediate downloads ~45 GB ~90 GB
Zarr store (est. 2–4× compression) ~11–23 GB ~23–45 GB

seNorge 2018, Norway, 1990–2026 (OPeNDAP via THREDDS, ~30 s/month):

Per variable Both variables
Intermediate downloads ~95 GB ~190 GB
Zarr store (est. 3–5× compression) ~19–32 GB ~38–64 GB

Failure modes differ by provider:

  • ERA5-Land: HTTP ContentLengthError — detected immediately
  • seNorge OPeNDAP: fails silently mid-write, leaving a 48-byte structurally valid but empty NetCDF stub; the next run treats it as a completed download, aborts later when trying to open it

Design

Shared async job model

Both ingestion and process execution should use the same job abstraction. Designing them separately risks two diverging async patterns that plugin authors have to understand independently.

POST /ingestions and POST /processes/{id}/execution both return immediately with a job ID:

{"job_id": "abc123", "status": "running"}

Progress is polled via GET /ingestions/{job_id} or GET /processes/{id}/jobs/{job_id}:

{
  "job_id": "abc123",
  "status": "running",
  "attempt": 1,
  "progress": {
    "done": 180,
    "total": 437,
    "percent": 41,
    "message": "Downloaded 2005-01"
  }
}

Jobs are persisted to disk so that a server restart does not lose their state. In-memory-only jobs would be lost on the restart that often follows a crash.

Function contract

Both ingestion functions and process execution functions accept an on_progress callback with a no-op default. Breaking changes are acceptable at this stage.

type ProgressCallback = Callable[[int, int, str], None]

def download(
    start: str,
    end: str,
    bbox: list[float],
    dirname: str | Path,
    prefix: str,
    variable: str = "tg",
    overwrite: bool = False,
    on_progress: ProgressCallback = lambda *_: None,
) -> list[Path]:
    months = _months_in_range(start, end)
    for i, (year, month) in enumerate(months):
        _download_month(...)
        on_progress(i + 1, len(months), f"Downloaded {year}-{month:02d}")
    ...

def execute_resample(
    dataset_id: str,
    freq: str,
    on_progress: ProgressCallback = lambda *_: None,
) -> dict[str, Any]:
    chunks = _split_into_chunks(dataset_id)
    for i, chunk in enumerate(chunks):
        _resample_chunk(chunk, freq)
        on_progress(i + 1, len(chunks), f"Resampled chunk {i + 1}/{len(chunks)}")
    ...

The no-op default means functions remain directly callable in tests and scripts without a job store.

Dispatcher

from concurrent.futures import ThreadPoolExecutor

_executor = ThreadPoolExecutor()

def _dispatch(func, kwargs: dict, job_id: str, max_retries: int = 3, backoff_seconds: int = 60) -> None:
    for attempt in range(1, max_retries + 1):
        def on_progress(done: int, total: int, message: str) -> None:
            update_job(job_id, done=done, total=total, message=message, attempt=attempt)
        try:
            result = func(**kwargs, on_progress=on_progress)
            complete_job(job_id, result=result)
            return
        except Exception as exc:
            if attempt == max_retries:
                fail_job(job_id, error=str(exc))
                return
            wait = backoff_seconds * (2 ** (attempt - 1))  # 60s, 120s, 240s
            update_job(job_id, status="retrying", attempt=attempt, retry_after=wait)
            time.sleep(wait)

Retry

Retry happens at the whole-job level in the dispatcher. This works because ingestion functions already skip existing files when overwrite=False, so a retry only re-downloads what failed — not the entire range.

Prerequisite: file validation before treating cached files as complete. A failed write leaves a corrupt stub that passes an existence check. The ingestion pipeline must validate cached files before skipping them:

def _is_valid_cache_file(path: Path, min_bytes: int = 1024) -> bool:
    return path.exists() and path.stat().st_size >= min_bytes

With this in place, a retry sees the corrupt stub as missing, re-downloads it, and continues.


Resume support

Resume applies to two distinct phases:

Download phase — already partially handled: overwrite=False skips existing files. With file validation in place (above), this becomes reliable.

Zarr build phase — not handled today. Building a zarr from 437 monthly files takes hours. If it fails or the process is killed, the partially-written zarr is unusable and the entire build restarts from scratch.

The solution is incremental zarr append (see also #64): build the zarr as each month downloads rather than in a single pass at the end. The job cursor tracks the last successfully appended month. On resume:

  1. Read the cursor: last_appended = 2014-12
  2. Skip months already in the zarr
  3. Continue appending from 2015-01

This makes the zarr the durable record. Monthly NetCDF cache files can be deleted after each successful append if disk space is a concern.

Connecting #64 to this issue: incremental zarr append is not just a performance improvement — it is the mechanism that makes the zarr build phase resumable.


Pre-download estimates

It would be useful to surface an estimated download size and duration before starting:

  • For remote Zarr sources (ERA5-Land via DestinE): fsspec supports HEAD requests with Content-Length, and Zarr's getsize_prefix() can aggregate chunk sizes for a given selection without downloading data
  • For other sources: file counts × typical per-period sizes give a coarser estimate

Could surface as a field in the job response or a dedicated GET /ingestions/estimate endpoint.


Out of scope for now

  • Parallel downloads (respecting provider rate limits)
  • Provider-side optimisation (batch period requests where the API supports it)
  • Full OGC API Processes async compliance (Parts 1–3)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions