Skip to content

Async processes and jobs: OGC API compliance, progress reporting, retry, and resume #111

@turban

Description

@turban

Problem

Ingesting large historical time ranges is slow, fragile, and opaque:

  • Downloads run synchronously — POST /ingestions blocks until complete, timing out HTTP connections on multi-year backfills
  • A transient network error aborts the entire ingestion with no automatic retry
  • There is no visibility into progress or estimated completion time
  • The same problem exists for POST /processes/{id}/execution — both share the same blocking dispatch pattern

For dataset-specific timing, sizes, and failure modes that motivate this issue, see #64.


Decision

There should be a single /processes endpoint that is OGC API Processes compatible and supports async job execution. The current split — a FastAPI router at /processes for execution and pygeoapi at /ogcapi/processes for listing — should be collapsed into one.

The implementation should be native FastAPI, not pygeoapi. pygeoapi's BaseProcessor plugin interface is a poor fit for operational processes like ingest and sync, its job manager (TinyDB-backed) would become a parallel persistence layer alongside our own artifact records, and implementing the OGC API Processes standard natively for the subset we need is straightforward. pygeoapi's role shrinks to serving /collections (OGC API Coverages) only.

Both ingestion and process execution should use the same job abstraction. Designing them separately risks two diverging async patterns that plugin authors have to understand independently.

Current state

  • FastAPI has a /processes/{id}/execution route that executes synchronously and returns status: "completed"
  • pygeoapi is mounted at /ogcapi and provides GET /ogcapi/processes (listing) and GET /ogcapi/processes/{id} (description)
  • /ingestions and /sync are separate synchronous REST endpoints with no shared job lifecycle

Target state

A single /processes endpoint, implemented natively in FastAPI, that is OGC API Processes compliant:

GET  /processes                         → list all processes (ingest, sync, resample, …)
GET  /processes/{id}                    → process description
POST /processes/{id}/execution          → submit job (async by default)

GET  /jobs                              → list all jobs
GET  /jobs/{job_id}                     → job status (accepted | running | successful | failed)
GET  /jobs/{job_id}/results             → output once done
DELETE /jobs/{job_id}                   → cancel job

All long-running operations — ingestion, sync, resample, and any custom processes — are modelled as processes:

POST /processes/ingest/execution        { "dataset_id": "...", "start": "...", "end": "..." }
POST /processes/sync/execution          { "dataset_id": "..." }
POST /processes/resample/execution      { "source_dataset_id": "...", ... }

Async execution follows the OGC standard: Prefer: respond-async202 Accepted with Location: /jobs/{job_id}. The existing synchronous behaviour can be kept as a fallback (no Prefer header → block and return result directly) for simple clients.

Job progress is returned on GET /jobs/{job_id}:

{
  "job_id": "abc123",
  "status": "running",
  "attempt": 1,
  "progress": {
    "done": 180,
    "total": 437,
    "percent": 41,
    "message": "Downloaded 2005-01"
  }
}

Jobs are persisted to disk so a server restart does not lose their state.

Consequences

  • The current /ingestions and /sync endpoints become legacy and should eventually be removed
  • pygeoapi's /ogcapi/processes is superseded by the native implementation; pygeoapi's role shrinks to serving /collections (OGC API Coverages)
  • This resolves the /processes path conflict described in feat: expose OGC API paths at top level alongside /stac and /zarr #110 — there is only one processes surface

Function contract

Both ingestion functions and process execution functions accept an on_progress callback with a no-op default. Breaking changes are acceptable at this stage.

type ProgressCallback = Callable[[int, int, str], None]

def download(
    start: str,
    end: str,
    bbox: list[float],
    dirname: str | Path,
    prefix: str,
    variable: str = "tg",
    overwrite: bool = False,
    on_progress: ProgressCallback = lambda *_: None,
) -> list[Path]:
    months = _months_in_range(start, end)
    for i, (year, month) in enumerate(months):
        _download_month(...)
        on_progress(i + 1, len(months), f"Downloaded {year}-{month:02d}")
    ...

def execute_resample(
    dataset_id: str,
    freq: str,
    on_progress: ProgressCallback = lambda *_: None,
) -> dict[str, Any]:
    chunks = _split_into_chunks(dataset_id)
    for i, chunk in enumerate(chunks):
        _resample_chunk(chunk, freq)
        on_progress(i + 1, len(chunks), f"Resampled chunk {i + 1}/{len(chunks)}")
    ...

The no-op default means functions remain directly callable in tests and scripts without a job store.

Dispatcher

from concurrent.futures import ThreadPoolExecutor

_executor = ThreadPoolExecutor()

def _dispatch(func, kwargs: dict, job_id: str, max_retries: int = 3, backoff_seconds: int = 60) -> None:
    for attempt in range(1, max_retries + 1):
        def on_progress(done: int, total: int, message: str) -> None:
            update_job(job_id, done=done, total=total, message=message, attempt=attempt)
        try:
            result = func(**kwargs, on_progress=on_progress)
            complete_job(job_id, result=result)
            return
        except Exception as exc:
            if attempt == max_retries:
                fail_job(job_id, error=str(exc))
                return
            wait = backoff_seconds * (2 ** (attempt - 1))  # 60s, 120s, 240s
            update_job(job_id, status="retrying", attempt=attempt, retry_after=wait)
            time.sleep(wait)

Retry

Retry happens at the whole-job level in the dispatcher. This works because ingestion functions already skip existing files when overwrite=False and validate cached files before treating them as complete (see #64), so a retry only re-downloads what failed — not the entire range.

Resume support

Resume relies on two prerequisites, both specified in #64:

Pre-download estimates

A GET /processes/ingest/estimate endpoint (or a dedicated field in the job submission response) could surface estimated download size and duration before the job starts. The approach for computing estimates — using fsspec HEAD requests and Zarr's getsize_prefix() for remote Zarr sources, or file-count heuristics for others — is documented in #64.


Implementation notes

  • FastAPI BackgroundTasks is sufficient for the MVP: submit, return 202, run in background, persist state to JSON
  • The background_tasks=None argument already exists in create_artifact — the plumbing is partially there
  • Job state (accepted / running / successful / failed + error) can be stored in a jobs.json file alongside records.json
  • For production, a persistent task queue (ARQ, Celery) replaces BackgroundTasks without changing the API surface

Extensibility

Custom processes follow the same plugin pattern as dataset templates. User-supplied process YAML files live in plugins_dir/processes/ and are merged with the built-ins — a custom process with the same id overrides the built-in.

id: dengue_suitability
name: Dengue suitability index
execution_function: nepal_climate_tools.processes.dengue.run

Processes can be flagged as internal so they run server-side (e.g. triggered by a post-sync cascade) without appearing in the public OGC API catalogue:

ogcapi:
  expose: false

Relationship to sync_kind: derived

Dataset templates can reference a process by id to produce a derived artifact on sync:

sync_kind: derived
processing:
  process_id: resample
  params:
    freq: MS

This decouples the dataset definition (what to produce and when) from the process definition (how to produce it). The same process can back multiple derived dataset templates.

Out of scope for now

  • Parallel downloads (respecting provider rate limits)
  • Provider-side optimisation (batch period requests where the API supports it)
  • Full OGC API Processes async compliance (Parts 1–3)

Open questions

  • Runtime registration — should there be a POST /processes endpoint for registering a process without a server restart (OGC API Processes Part 2 — Deploy), or is file-based registration sufficient for the target user base?
  • Process versioning — if a custom process function changes, do existing derived artifacts need to be marked stale?

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions