diff --git a/climate_api/data/processes/resample.yaml b/climate_api/data/processes/resample.yaml index 6afaabd8..bded70af 100644 --- a/climate_api/data/processes/resample.yaml +++ b/climate_api/data/processes/resample.yaml @@ -9,6 +9,7 @@ - resampling jobControlOptions: - sync-execute + - async-execute inputs: source_dataset_id: type: string diff --git a/climate_api/data_registry/services/processes.py b/climate_api/data_registry/services/processes.py index 602ac004..406d607d 100644 --- a/climate_api/data_registry/services/processes.py +++ b/climate_api/data_registry/services/processes.py @@ -81,6 +81,14 @@ def get_process(process_id: str) -> dict[str, Any] | None: return {p["id"]: p for p in list_processes()}.get(process_id) +def get_process_function(process_id: str) -> Any: + """Import and return the execution callable for one registered process id.""" + process = get_process(process_id) + if process is None: + raise ValueError(f"Unknown process '{process_id}'") + return _get_dynamic_function(process["execution"]["function"]) + + def _load_builtin_processes() -> list[dict[str, Any]]: """Load built-in process definitions from package data via importlib.resources.""" pkg = importlib.resources.files("climate_api") / "data" / "processes" diff --git a/climate_api/jobs/__init__.py b/climate_api/jobs/__init__.py new file mode 100644 index 00000000..5cb2ac65 --- /dev/null +++ b/climate_api/jobs/__init__.py @@ -0,0 +1 @@ +"""Native job execution framework for process runs.""" diff --git a/climate_api/jobs/models.py b/climate_api/jobs/models.py new file mode 100644 index 00000000..5c23fea6 --- /dev/null +++ b/climate_api/jobs/models.py @@ -0,0 +1,94 @@ +"""Pydantic schemas and execution primitives for native jobs.""" + +from __future__ import annotations + +from datetime import datetime +from enum import StrEnum +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class JobCancelledError(Exception): + """Raised by a job implementation when cooperative cancellation is honored.""" + + +class JobStatus(StrEnum): + """Persisted lifecycle states for native jobs.""" + + ACCEPTED = "accepted" + RUNNING = "running" + RETRYING = "retrying" + SUCCESSFUL = "successful" + FAILED = "failed" + CANCELLED = "cancelled" + + +class JobType(StrEnum): + """Native job categories.""" + + PROCESS = "process" + + +class JobLink(BaseModel): + """Hypermedia link for a job resource.""" + + href: str + rel: str + title: str | None = None + + +class JobProgress(BaseModel): + """Progress details reported by a running job.""" + + done: int | None = None + total: int | None = None + percent: float | None = None + message: str | None = None + + +class JobError(BaseModel): + """Serialized error details for a failed job.""" + + type: str + message: str + + +class JobRecord(BaseModel): + """Persisted native job record.""" + + model_config = ConfigDict(populate_by_name=True) + + job_id: str = Field(validation_alias="jobID", serialization_alias="jobID") + process_id: str = Field(validation_alias="processID", serialization_alias="processID") + type: JobType = JobType.PROCESS + status: JobStatus + created_at: datetime = Field(validation_alias="createdAt", serialization_alias="createdAt") + started_at: datetime | None = Field(default=None, validation_alias="startedAt", serialization_alias="startedAt") + finished_at: datetime | None = Field(default=None, validation_alias="finishedAt", serialization_alias="finishedAt") + attempt: int = 0 + max_attempts: int = Field(default=1, validation_alias="maxAttempts", serialization_alias="maxAttempts") + executor_kind: str = Field(default="thread", validation_alias="executorKind", serialization_alias="executorKind") + request: dict[str, Any] = Field(default_factory=dict) + progress: JobProgress = Field(default_factory=JobProgress) + result: Any | None = None + error: JobError | None = None + cancel_requested: bool = Field( + default=False, + validation_alias="cancelRequested", + serialization_alias="cancelRequested", + ) + retry_after: int | None = Field( + default=None, + validation_alias="retryAfter", + serialization_alias="retryAfter", + ) + cursor: dict[str, Any] | None = None + links: list[JobLink] = Field(default_factory=list) + + +class JobListResponse(BaseModel): + """Envelope response for native jobs.""" + + jobs: list[JobRecord] = Field(default_factory=list) + links: list[JobLink] = Field(default_factory=list) diff --git a/climate_api/jobs/routes.py b/climate_api/jobs/routes.py new file mode 100644 index 00000000..ac002d1b --- /dev/null +++ b/climate_api/jobs/routes.py @@ -0,0 +1,26 @@ +"""HTTP routes for native job status and cancellation.""" + +from fastapi import APIRouter + +from climate_api.jobs.models import JobListResponse, JobRecord +from climate_api.jobs.service import get_job_service + +router = APIRouter() + + +@router.get("", response_model=JobListResponse) +def list_jobs() -> JobListResponse: + """Return all persisted native jobs.""" + return get_job_service().list_jobs() + + +@router.get("/{job_id}", response_model=JobRecord) +def get_job(job_id: str) -> JobRecord: + """Return one native job.""" + return get_job_service().get_job_or_404(job_id) + + +@router.delete("/{job_id}", response_model=JobRecord) +def cancel_job(job_id: str) -> JobRecord: + """Request cooperative cancellation for one native job.""" + return get_job_service().request_cancellation(job_id) diff --git a/climate_api/jobs/service.py b/climate_api/jobs/service.py new file mode 100644 index 00000000..4002cbaa --- /dev/null +++ b/climate_api/jobs/service.py @@ -0,0 +1,424 @@ +"""Runtime service for native asynchronous process jobs.""" + +from __future__ import annotations + +import inspect +import logging +import threading +import time +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Any, Protocol +from uuid import uuid4 + +from fastapi import HTTPException + +from climate_api.data_registry.services import processes as process_registry +from climate_api.jobs import store +from climate_api.jobs.models import ( + JobCancelledError, + JobError, + JobLink, + JobListResponse, + JobProgress, + JobRecord, + JobStatus, +) +from climate_api.shared.time import utc_now + +logger = logging.getLogger(__name__) + + +def _retry_delay_seconds(attempt: int) -> int: + """Return the retry delay in seconds for a given failed attempt count.""" + exponent = attempt - 1 if attempt > 1 else 0 + return int(min(240, 60 * (2**exponent))) + + +def _job_links(job_id: str) -> list[JobLink]: + return [JobLink(href=f"/jobs/{job_id}", rel="self", title="Job detail")] + + +def _catalog_links() -> list[JobLink]: + return [JobLink(href="/jobs", rel="self", title="Jobs")] + + +def _supports_argument(func: Any, name: str) -> bool: + try: + signature = inspect.signature(func) + except (TypeError, ValueError): + return False + if name in signature.parameters: + return True + return any(parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in signature.parameters.values()) + + +def _is_pre_execution_cancellation(record: JobRecord) -> bool: + return record.cancel_requested and record.status in {JobStatus.ACCEPTED, JobStatus.RETRYING} + + +class JobExecutionContext: + """Callbacks and state helpers exposed to one running job.""" + + def __init__(self, service: "JobService", job_id: str) -> None: + self._service = service + self.job_id = job_id + + def report_progress(self, done: int | None = None, total: int | None = None, message: str | None = None) -> None: + self._service.update_progress(self.job_id, done=done, total=total, message=message) + + def is_cancel_requested(self) -> bool: + record = store.get_job_record(self.job_id) + return bool(record and record.cancel_requested) + + def save_cursor(self, cursor: dict[str, Any]) -> None: + self._service.save_cursor(self.job_id, cursor) + + def load_cursor(self) -> dict[str, Any] | None: + record = store.get_job_record(self.job_id) + return None if record is None else record.cursor + + +class ProcessExecutor(Protocol): + """Execution backend contract for native jobs.""" + + kind: str + + def submit(self, fn: Any, /, *args: Any, **kwargs: Any) -> Future[None]: + """Submit one callable for asynchronous execution.""" + ... + + def shutdown(self) -> None: + """Release executor resources.""" + ... + + +class ThreadProcessExecutor: + """Default in-process thread-backed job executor.""" + + kind = "thread" + + def __init__(self, *, max_workers: int = 4) -> None: + self._pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="climate-api-job") + + def submit(self, fn: Any, /, *args: Any, **kwargs: Any) -> Future[None]: + """Submit one callable to the thread pool.""" + return self._pool.submit(fn, *args, **kwargs) + + def shutdown(self) -> None: + """Stop the thread pool without waiting for queued work to finish.""" + self._pool.shutdown(wait=False, cancel_futures=True) + + +class JobService: + """Persisted job store plus in-process executor runtime.""" + + def __init__(self, *, executor: ProcessExecutor | None = None, max_workers: int = 4) -> None: + self._executor = executor or ThreadProcessExecutor(max_workers=max_workers) + self._futures: dict[str, Future[None]] = {} + self._lock = threading.Lock() + + def shutdown(self) -> None: + """Stop the executor without waiting for outstanding work.""" + self._executor.shutdown() + + def list_jobs(self) -> JobListResponse: + """Return all persisted jobs ordered by creation time descending.""" + records = sorted(store.list_job_records(), key=lambda record: record.created_at, reverse=True) + return JobListResponse(jobs=records, links=_catalog_links()) + + def get_job_or_404(self, job_id: str) -> JobRecord: + """Return one persisted job or raise 404.""" + record = store.get_job_record(job_id) + if record is None: + raise HTTPException(status_code=404, detail=f"Job '{job_id}' not found") + return record + + def submit_process_job( + self, + *, + process_id: str, + request: dict[str, Any], + max_attempts: int = 1, + ) -> JobRecord: + """Create and asynchronously submit one process execution job.""" + process = process_registry.get_process(process_id) + if process is None or not process["expose"]: + raise HTTPException(status_code=404, detail=f"Unknown process '{process_id}'") + + job_id = str(uuid4()) + record = JobRecord( + job_id=job_id, + process_id=process_id, + status=JobStatus.ACCEPTED, + created_at=utc_now(), + max_attempts=max_attempts, + executor_kind=self._executor.kind, + request=request, + links=_job_links(job_id), + ) + store.create_job_record(record) + self._enqueue_job(record.job_id) + return self.get_job_or_404(record.job_id) + + def request_cancellation(self, job_id: str) -> JobRecord: + """Request cooperative cancellation for a job.""" + self.get_job_or_404(job_id) + record = store.mutate_job_record( + job_id, + lambda current: ( + current + if current.status in {JobStatus.SUCCESSFUL, JobStatus.FAILED, JobStatus.CANCELLED} + else current.model_copy(update={"cancel_requested": True}) + ), + ) + + if record.status == JobStatus.ACCEPTED: + with self._lock: + future = self._futures.get(job_id) + if future is not None and future.cancel(): + record = store.mutate_job_record( + job_id, + lambda current: current.model_copy( + update={ + "status": JobStatus.CANCELLED, + "finished_at": utc_now(), + "progress": JobProgress(message="Cancellation accepted before execution started"), + } + ), + ) + return record + + def recover_pending_jobs(self) -> None: + """Requeue interrupted jobs on startup.""" + for record in store.list_job_records(): + if record.status not in {JobStatus.ACCEPTED, JobStatus.RUNNING, JobStatus.RETRYING}: + continue + if record.cancel_requested: + store.mutate_job_record( + record.job_id, + lambda current: current.model_copy( + update={ + "status": JobStatus.CANCELLED, + "finished_at": utc_now(), + "progress": JobProgress(message="Cancelled before recovery requeue"), + } + ), + ) + continue + if record.status == JobStatus.RUNNING: + store.mutate_job_record( + record.job_id, + lambda current: current.model_copy( + update={ + "status": JobStatus.ACCEPTED, + "attempt": max(0, current.attempt - 1), + "finished_at": None, + "retry_after": None, + "error": None, + "progress": JobProgress(message="Requeued after restart during execution"), + } + ), + ) + self._enqueue_job(record.job_id) + + def update_progress( + self, + job_id: str, + *, + done: int | None = None, + total: int | None = None, + message: str | None = None, + ) -> JobRecord: + """Persist progress details for one job.""" + + def _mutation(current: JobRecord) -> JobRecord: + current_done = done if done is not None else current.progress.done + current_total = total if total is not None else current.progress.total + percent: float | None = current.progress.percent + if current_done is not None and current_total is not None and current_total != 0: + percent = round((current_done / current_total) * 100.0, 2) + progress = JobProgress( + done=current_done, + total=current_total, + percent=percent, + message=message if message is not None else current.progress.message, + ) + return current.model_copy(update={"progress": progress}) + + return store.mutate_job_record(job_id, _mutation) + + def save_cursor(self, job_id: str, cursor: dict[str, Any]) -> JobRecord: + """Persist a lightweight checkpoint cursor for one job.""" + return store.mutate_job_record(job_id, lambda current: current.model_copy(update={"cursor": dict(cursor)})) + + def _enqueue_job(self, job_id: str) -> None: + with self._lock: + existing = self._futures.get(job_id) + if existing is not None and not existing.done(): + return + future = self._executor.submit(self._run_job, job_id) + self._futures[job_id] = future + + def _sleep_for_retry(self, job_id: str, seconds: int) -> bool: + """Sleep in short intervals so retry wait remains cancellation-aware.""" + remaining = float(seconds) + while remaining > 0: + record = store.get_job_record(job_id) + if record is not None and record.cancel_requested: + return False + interval = min(1.0, remaining) + time.sleep(interval) + remaining -= interval + return True + + def _run_job(self, job_id: str) -> None: + try: + self._execute_job(job_id) + finally: + with self._lock: + self._futures.pop(job_id, None) + + def _execute_job(self, job_id: str) -> None: + while True: + record = self.get_job_or_404(job_id) + if _is_pre_execution_cancellation(record): + message = ( + "Cancellation accepted before execution started" + if record.status == JobStatus.ACCEPTED + else "Cancelled before retry execution resumed" + ) + store.mutate_job_record( + job_id, + lambda current: current.model_copy( + update={ + "status": JobStatus.CANCELLED, + "finished_at": utc_now(), + "progress": JobProgress(message=message), + } + ), + ) + return + + started = store.mutate_job_record( + job_id, + lambda current: current.model_copy( + update={ + "status": JobStatus.RUNNING, + "started_at": current.started_at or utc_now(), + "finished_at": None, + "attempt": current.attempt + 1, + "retry_after": None, + "error": None, + } + ), + ) + + try: + result = self._invoke_process(started) + store.mutate_job_record( + job_id, + lambda current: current.model_copy( + update={ + "status": JobStatus.SUCCESSFUL, + "finished_at": utc_now(), + "result": result, + "progress": JobProgress( + done=current.progress.done, + total=current.progress.total, + percent=current.progress.percent, + message="Completed", + ), + } + ), + ) + return + except JobCancelledError: + store.mutate_job_record( + job_id, + lambda current: current.model_copy( + update={ + "status": JobStatus.CANCELLED, + "finished_at": utc_now(), + "progress": JobProgress( + done=current.progress.done, + total=current.progress.total, + percent=current.progress.percent, + message="Cancelled", + ), + } + ), + ) + return + except Exception as exc: + logger.exception("Job %s failed", job_id) + error = JobError(type=type(exc).__name__, message=str(exc)) + if started.attempt < started.max_attempts: + retry_after = _retry_delay_seconds(started.attempt) + store.mutate_job_record( + job_id, + lambda latest: latest.model_copy( + update={ + "status": JobStatus.RETRYING, + "retry_after": retry_after, + "error": error, + "progress": JobProgress(message="Retry scheduled"), + } + ), + ) + if not self._sleep_for_retry(job_id, retry_after): + continue + continue + + store.mutate_job_record( + job_id, + lambda latest: latest.model_copy( + update={ + "status": JobStatus.FAILED, + "finished_at": utc_now(), + "error": error, + "progress": JobProgress( + done=latest.progress.done, + total=latest.progress.total, + percent=latest.progress.percent, + message="Failed", + ), + } + ), + ) + return + + def _invoke_process(self, record: JobRecord) -> Any: + process = process_registry.get_process(record.process_id) + if process is None or not process["expose"]: + raise ValueError(f"Unknown process '{record.process_id}'") + func = process_registry.get_process_function(record.process_id) + context = JobExecutionContext(self, record.job_id) + kwargs = dict(record.request) + if _supports_argument(func, "on_progress"): + kwargs["on_progress"] = context.report_progress + if _supports_argument(func, "is_cancel_requested"): + kwargs["is_cancel_requested"] = context.is_cancel_requested + if _supports_argument(func, "load_cursor"): + kwargs["load_cursor"] = context.load_cursor + if _supports_argument(func, "save_cursor"): + kwargs["save_cursor"] = context.save_cursor + return func(**kwargs) + + +_job_service: JobService | None = None + + +def get_job_service() -> JobService: + """Return the singleton native job runtime.""" + global _job_service + if _job_service is None: + _job_service = JobService() + return _job_service + + +def reset_job_service() -> None: + """Reset the singleton runtime for tests.""" + global _job_service + if _job_service is not None: + _job_service.shutdown() + _job_service = None diff --git a/climate_api/jobs/store.py b/climate_api/jobs/store.py new file mode 100644 index 00000000..8417eaf5 --- /dev/null +++ b/climate_api/jobs/store.py @@ -0,0 +1,126 @@ +"""JSON-backed persistence for native job records.""" + +from __future__ import annotations + +import json +import os +from collections.abc import Callable +from pathlib import Path + +import portalocker + +from climate_api import config as api_config +from climate_api.jobs.models import JobRecord + + +def _resolve_jobs_dir() -> Path: + data_dir = api_config.get_data_dir() + if data_dir is not None: + return data_dir / "jobs" + xdg_data = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) + return xdg_data / "climate-api" / "jobs" + + +JOBS_DIR = _resolve_jobs_dir() +JOBS_INDEX_PATH = JOBS_DIR / "jobs.json" + + +def ensure_store() -> None: + """Create the jobs metadata store if it does not exist.""" + JOBS_DIR.mkdir(parents=True, exist_ok=True) + if not JOBS_INDEX_PATH.exists(): + JOBS_INDEX_PATH.write_text("[]\n", encoding="utf-8") + + +def list_job_records() -> list[JobRecord]: + """Return all persisted job records.""" + return [JobRecord.model_validate(raw) for raw in _load_records()] + + +def get_job_record(job_id: str) -> JobRecord | None: + """Return one job record if present.""" + for raw in _load_records(): + if raw.get("job_id") == job_id: + return JobRecord.model_validate(raw) + return None + + +def create_job_record(record: JobRecord) -> JobRecord: + """Persist a newly created job record.""" + + def _mutation(records: list[dict[str, object]]) -> JobRecord: + if any(existing.get("job_id") == record.job_id for existing in records): + raise ValueError(f"Job '{record.job_id}' already exists") + records.append(record.model_dump(mode="json")) + return record + + return _mutate_records(_mutation) + + +def upsert_job_record(record: JobRecord) -> JobRecord: + """Persist the full replacement state for one job.""" + + def _mutation(records: list[dict[str, object]]) -> JobRecord: + payload = record.model_dump(mode="json") + for index, existing in enumerate(records): + if existing.get("job_id") == record.job_id: + records[index] = payload + return record + records.append(payload) + return record + + return _mutate_records(_mutation) + + +def mutate_job_record(job_id: str, mutation: Callable[[JobRecord], JobRecord]) -> JobRecord: + """Load, mutate, and persist one existing job record.""" + + def _apply(records: list[dict[str, object]]) -> JobRecord: + for index, existing in enumerate(records): + if existing.get("job_id") != job_id: + continue + current = JobRecord.model_validate(existing) + updated = mutation(current) + records[index] = updated.model_dump(mode="json") + return updated + raise KeyError(job_id) + + return _mutate_records(_apply) + + +def _load_records() -> list[dict[str, object]]: + ensure_store() + return _read_records_from_disk() + + +def _read_records_from_disk() -> list[dict[str, object]]: + with open(JOBS_INDEX_PATH, encoding="utf-8") as handle: + portalocker.lock(handle, portalocker.LOCK_SH) + try: + payload = json.load(handle) + finally: + portalocker.unlock(handle) + if not isinstance(payload, list): + raise ValueError("jobs.json must contain a list") + if not all(isinstance(item, dict) for item in payload): + raise ValueError("jobs.json must only contain objects") + return payload + + +def _mutate_records(mutation: Callable[[list[dict[str, object]]], JobRecord]) -> JobRecord: + ensure_store() + with open(JOBS_INDEX_PATH, "r+", encoding="utf-8") as handle: + portalocker.lock(handle, portalocker.LOCK_EX) + try: + payload = json.load(handle) + if not isinstance(payload, list): + raise ValueError("jobs.json must contain a list") + records = payload + result = mutation(records) + handle.seek(0) + json.dump(records, handle, indent=2) + handle.write("\n") + handle.truncate() + return result + finally: + portalocker.unlock(handle) diff --git a/climate_api/main.py b/climate_api/main.py index 5b480b8d..d298a25b 100644 --- a/climate_api/main.py +++ b/climate_api/main.py @@ -1,7 +1,8 @@ """DHIS2 Climate API -- Climate and earth observation data API for DHIS2.""" import os -from collections.abc import Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable +from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware @@ -11,6 +12,8 @@ from climate_api.data_registry import routes as dataset_template_routes from climate_api.extents import routes as extent_routes from climate_api.ingestions import routes as ingestion_routes +from climate_api.jobs import routes as job_routes +from climate_api.jobs.service import get_job_service from climate_api.processing import routes as processing_routes from climate_api.pygeoapi_app import mount_pygeoapi from climate_api.stac import routes as stac_routes @@ -35,6 +38,17 @@ def _append_vary_value(response: Response, value: str) -> None: response.headers["Vary"] = ", ".join([*values, value]) +@asynccontextmanager +async def _lifespan(_app: FastAPI) -> AsyncIterator[None]: + """Run lightweight startup recovery hooks for the application lifecycle.""" + job_service = get_job_service() + job_service.recover_pending_jobs() + try: + yield + finally: + job_service.shutdown() + + def create_app() -> FastAPI: """Create and configure the Climate API FastAPI application. @@ -43,7 +57,7 @@ def create_app() -> FastAPI: from climate_api.main import create_app app = create_app() """ - _app = FastAPI() + _app = FastAPI(lifespan=_lifespan) _app.add_middleware( CORSMiddleware, @@ -89,6 +103,7 @@ async def add_zarr_browser_access_headers( _app.include_router(dataset_template_routes.router, prefix="/dataset-templates", tags=["Dataset templates"]) _app.include_router(ingestion_routes.datasets_router, prefix="/datasets", tags=["Datasets"]) _app.include_router(ingestion_routes.ingestions_router, prefix="/ingestions", tags=["Ingestions"]) + _app.include_router(job_routes.router, prefix="/jobs", tags=["Jobs"]) _app.include_router(ingestion_routes.zarr_router, prefix="/zarr", tags=["Zarr"]) _app.include_router(ingestion_routes.sync_router, prefix="/sync", tags=["Sync"]) _app.include_router(processing_routes.router, prefix="/processes", tags=["Processes"]) diff --git a/climate_api/processing/routes.py b/climate_api/processing/routes.py index 2041abbd..469e6367 100644 --- a/climate_api/processing/routes.py +++ b/climate_api/processing/routes.py @@ -2,14 +2,24 @@ from typing import Any -from fastapi import APIRouter, Body, HTTPException +from fastapi import APIRouter, Body, Header, HTTPException, Response from climate_api.data_registry.services import processes as process_registry +from climate_api.jobs.service import get_job_service +from climate_api.processing import services as processing_services from climate_api.processing.schemas import ProcessDetail, ProcessField, ProcessLink, ProcessListResponse, ProcessSummary router = APIRouter() +def _prefer_respond_async(prefer: str | None) -> bool: + """Return True when Prefer contains a respond-async directive.""" + if prefer is None: + return False + directives = [item.strip().split(";", 1)[0].strip().lower() for item in prefer.split(",")] + return "respond-async" in directives + + def _process_links(process_id: str) -> list[ProcessLink]: return [ ProcessLink(href=f"/processes/{process_id}", rel="self", title="Process detail"), @@ -75,6 +85,31 @@ def _get_public_process_or_404(process_id: str) -> dict[str, Any]: return process +def _validate_required_process_inputs(process: dict[str, Any], request: dict[str, Any]) -> None: + raw_inputs = process.get("inputs") + if not isinstance(raw_inputs, dict): + return + missing = [ + key + for key, value in raw_inputs.items() + if isinstance(key, str) and isinstance(value, dict) and value.get("required") is True and key not in request + ] + if missing: + joined = ", ".join(sorted(missing)) + raise HTTPException(status_code=400, detail=f"Missing required process inputs: {joined}") + + +def _validate_process_request(process: dict[str, Any], request: dict[str, Any]) -> None: + _validate_required_process_inputs(process, request) + if process.get("id") == "resample": + processing_services.validate_resample_request(**request) + + +def _supports_async_execution(process: dict[str, Any]) -> bool: + job_control_options = process.get("jobControlOptions") + return isinstance(job_control_options, list) and "async-execute" in job_control_options + + @router.get("", response_model=ProcessListResponse) def get_processes_catalog() -> ProcessListResponse: """Return the registered native process catalog.""" @@ -95,12 +130,26 @@ def get_process(process_id: str) -> ProcessDetail: @router.post("/{process_id}/execution") def run_process_execution( process_id: str, + response: Response, request: dict[str, Any] = Body(...), + prefer: str | None = Header(default=None), ) -> Any: """Dispatch to a registered process execution function by process id.""" process = _get_public_process_or_404(process_id) + _validate_process_request(process, request) + if _prefer_respond_async(prefer): + if not _supports_async_execution(process): + raise HTTPException( + status_code=409, + detail=f"Process '{process_id}' does not support async execution", + ) + job = get_job_service().submit_process_job(process_id=process_id, request=request) + response.status_code = 202 + response.headers["Location"] = f"/jobs/{job.job_id}" + return job + try: - func = process_registry._get_dynamic_function(process["execution"]["function"]) + func = process_registry.get_process_function(process_id) except (ImportError, AttributeError, ValueError) as exc: raise HTTPException( status_code=500, diff --git a/climate_api/processing/services.py b/climate_api/processing/services.py index 590506b5..7e9691a5 100644 --- a/climate_api/processing/services.py +++ b/climate_api/processing/services.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Callable from typing import Any import pandas as pd @@ -9,11 +10,47 @@ from climate_api.ingestions import services as ingestion_services from climate_api.ingestions.schemas import DatasetRecord +from climate_api.jobs.models import JobCancelledError from climate_api.processing.resample import materialize_resampled_artifact _SUPPORTED_RESAMPLE_METHODS: frozenset[str] = frozenset({"mean", "sum", "min", "max"}) +def _noop_progress(_done: int | None = None, _total: int | None = None, _message: str | None = None) -> None: + """Default no-op progress callback.""" + + +def _never_cancel() -> bool: + """Default cancellation probe for synchronous execution.""" + return False + + +def validate_resample_request( + *, + source_dataset_id: str, + frequency: str | None, + method: str | None, + start: str | None, + **_ignored: Any, +) -> None: + """Validate resample request parameters before execution or async acceptance.""" + if not source_dataset_id: + raise HTTPException(status_code=400, detail="source_dataset_id is required") + if not frequency: + raise HTTPException(status_code=400, detail="frequency is required") + if not start: + raise HTTPException(status_code=400, detail="start is required") + if method not in _SUPPORTED_RESAMPLE_METHODS: + supported = ", ".join(sorted(_SUPPORTED_RESAMPLE_METHODS)) + raise HTTPException(status_code=400, detail=f"Unsupported method '{method}'. Supported: {supported}") + try: + _offset = pd.tseries.frequencies.to_offset(frequency) + except (TypeError, ValueError): + _offset = None + if _offset is None: + raise HTTPException(status_code=400, detail=f"Invalid frequency '{frequency}'") + + def execute_resample( *, source_dataset_id: str, @@ -23,23 +60,26 @@ def execute_resample( end: str | None = None, overwrite: bool = False, publish: bool = True, + on_progress: Callable[[int | None, int | None, str | None], None] | None = None, + is_cancel_requested: Callable[[], bool] | None = None, + load_cursor: Callable[[], dict[str, Any] | None] | None = None, + save_cursor: Callable[[dict[str, Any]], None] | None = None, **_ignored: Any, ) -> dict[str, Any]: - """Validate and execute the resample process; return a JSON-serializable response dict. + """Execute the resample process; return a JSON-serializable response dict. - This is the execution_function registered in the process YAML. The route dispatches - here by calling it with the raw request body as keyword arguments. + This is the execution_function registered in the process YAML. Public route handlers + perform request validation before dispatching here. """ - if method not in _SUPPORTED_RESAMPLE_METHODS: - supported = ", ".join(sorted(_SUPPORTED_RESAMPLE_METHODS)) - raise HTTPException(status_code=400, detail=f"Unsupported method '{method}'. Supported: {supported}") - try: - _offset = pd.tseries.frequencies.to_offset(frequency) - except ValueError: - _offset = None - if _offset is None: - raise HTTPException(status_code=400, detail=f"Invalid frequency '{frequency}'") + progress = on_progress or _noop_progress + cancel_requested = is_cancel_requested or _never_cancel + _ = load_cursor, save_cursor + + progress(0, 2, "Validating request") + if cancel_requested(): + raise JobCancelledError() + progress(1, 2, "Materializing derived dataset") artifact_id, dataset_summary = run_resample_process( source_dataset_id=source_dataset_id, frequency=frequency, @@ -49,6 +89,9 @@ def execute_resample( overwrite=overwrite, publish=publish, ) + if cancel_requested(): + raise JobCancelledError() + progress(2, 2, "Completed") return { "artifact_id": artifact_id, "status": "completed", diff --git a/docs/internal/clim-703-implementation-review.md b/docs/internal/clim-703-implementation-review.md new file mode 100644 index 00000000..291a77fa --- /dev/null +++ b/docs/internal/clim-703-implementation-review.md @@ -0,0 +1,96 @@ +# Review: CLIM-703 Implementation + +Reviewed against `docs/internal/issue-111-async-jobs-design.md`, GitHub issues `#111` and `#64`, +and the current branch state after the final cleanup pass. + +--- + +## Outcome + +`CLIM-703` is ready for PR. + +The native async job framework slice described in `#111` is implemented, lint-clean, and covered +by the current test suite. + +Validation signal on this branch: + +- `make lint` passes +- `make test` passes +- result: `247 passed, 1 skipped` + +--- + +## Acceptance scope + +Every item marked in scope for the first slice is delivered: + +| Requirement | Status | +|---|---| +| `jobs.json` store under `data_dir` | ✓ | +| Job schemas and mutation helpers | ✓ | +| `GET /jobs` | ✓ | +| `GET /jobs/{job_id}` | ✓ | +| `GET /jobs/{job_id}/results` explicitly deferred | ✓ | +| `POST /processes/{id}/execution` async negotiation | ✓ | +| `202 Accepted` + `Location` header | ✓ | +| Synchronous fallback | ✓ | +| `DELETE /jobs/{job_id}` cooperative cancellation | ✓ | +| Retry state and attempt tracking | ✓ | +| Startup recovery | ✓ | +| Executor abstraction | ✓ | +| `resample` wired through async path | ✓ | + +--- + +## Design divergences worth keeping in mind + +These are not blockers for the PR. + +- `ProcessExecutor.submit(...)` follows the `concurrent.futures.Executor` shape rather than the + earlier design sketch. This is a better split of concerns: retry and job semantics remain in the + runtime. +- `JobRecord.process_id` is currently non-nullable. That is fine for the current process-backed + job model, but future non-process job types may require a migration. +- `error` is represented as a structured `JobError` object rather than a plain string. This is an + improvement over the original design note. + +--- + +## Remaining non-blocking follow-up topics + +- `time.sleep(retry_after)` still occupies a thread-pool worker during retry delay. That is + acceptable for the current MVP but should be revisited before adding higher-concurrency + ingestion jobs. +- `resample.yaml` method enum values and Python-side supported-method constants are still kept in + sync manually. +- `JobRecord.process_id` should be revisited when ingestion and other non-process job types move + onto the framework. + +--- + +## Relationship to `#64` + +`CLIM-703` and `#64` address different layers of the same problem and compose cleanly. + +### What `CLIM-703` now owns + +- persisted job lifecycle +- submission / transition / retry / recovery orchestration +- progress reporting +- cooperative cancellation +- cursor/checkpoint hooks in the runtime contract + +### What `#64` still owns + +- meaningful data-plane checkpoints +- incremental/resumable materialization behaviour +- efficient resume after interruption during large ingest/build workflows + +### Current meaning of the `resample` integration + +`resample` now runs through the native job framework and declares the cursor hooks explicitly, even +though it does not use them yet. That makes the contract visible and keeps the runtime shape ready +for future resumable processes. + +Until a resumable ingest/build implementation lands, restart recovery correctly re-runs interrupted +jobs from the beginning. diff --git a/docs/internal/issue-111-async-jobs-design.md b/docs/internal/issue-111-async-jobs-design.md new file mode 100644 index 00000000..7fe8fb49 --- /dev/null +++ b/docs/internal/issue-111-async-jobs-design.md @@ -0,0 +1,469 @@ +# Issue 111 Design + +## Title + +Async processes and jobs: OGC API compliance, progress reporting, retry, and resume + +## Purpose + +This note replaces the older `#89` framing and aligns implementation planning with the current repository state after `CLIM-689`. + +The most important clarification is: + +- `CLIM-689` already delivered the native process surface +- Issue `#111` should now focus on the native job and async execution framework built on top of that surface + +This means `#111` is no longer a mixed “process registry + jobs” ticket. The process registry and native `/processes` discovery/execution baseline already exist. + +## Current Status + +`CLIM-703` has now implemented the first native job-framework slice described here: + +- persisted `jobs.json` store +- `/jobs` list/detail/cancel endpoints +- async process execution via `Prefer: respond-async` +- synchronous fallback when `Prefer` is absent +- cooperative cancellation +- retry with `retryAfter` delay semantics (`60s`, `120s`, `240s`) +- startup recovery for `accepted`, `running`, and `retrying` jobs +- executor abstraction with default in-process thread-backed implementation +- `resample` wired through the framework as the first concrete process + +What remains after `CLIM-703` is downstream adoption and refinement rather than first-framework construction: + +- ingestion migration onto the native job framework +- sync migration onto the native job framework +- richer executor backends if needed +- true resumable build behavior through `#64` + +## One-Sentence Model + +A **process** defines what operation can be run. + +A **job** is one concrete execution instance of that process or of another long-running operation. + +An **executor** defines how that concrete job is run. + +## What 689 Already Delivered + +`CLIM-689` established: + +- native `GET /processes` +- native `GET /processes/{id}` +- native `POST /processes/{id}/execution` +- YAML-backed process registry +- OGC-oriented process response shapes +- `resample` as the first built-in published process + +That work should be treated as an already-satisfied prerequisite, not as open scope for `#111`. + +## What 111 Should Mean Now + +Issue `#111` should implement the **job framework layer**: + +- async submission +- persisted job state +- polling +- progress reporting +- cancellation semantics +- retry semantics +- restart recovery / resumability hooks +- pluggable execution backends + +It should not reopen: + +- native FastAPI vs pygeoapi ownership of `/processes` +- process registry design +- process discovery shape + +## Decision Summary + +Build one **native FastAPI job layer** shared by: + +- process execution +- later ingestion, via its own migration ticket +- later sync, via its own migration ticket + +This job layer should: + +- persist state to disk +- support async submission using `Prefer: respond-async` +- keep synchronous fallback initially +- expose generic `/jobs` endpoints +- define framework-level cancellation, retry, and restart-recovery behavior +- provide an executor abstraction so specific job types can choose how work is run +- evolve toward OGC API Processes job semantics where relevant + +## Relationship to Other Work + +### `CLIM-689` + +Already done and foundational. + +`#111` extends that work rather than replacing or closing over it conceptually. + +### Issue `#40` + +This is enabling infrastructure for extensibility. Custom processes become much more operationally credible once they can run as jobs with progress and durable state. + +### `CLIM-681` + +Normals/anomalies are exactly the kind of derived workflows that benefit from a job model. That is why `#111` should come before or alongside serious `681` implementation. + +### `#64` + +`#64` is the real prerequisite for true resumable Zarr build behavior. `#111` can lay the job framework and resume hooks, but it should not claim that full build resume is solved without incremental append/checkpoint support. + +### `#107` + +`#107` (`_frequency_to_iso_resolution` loses the multiplier) is not a hard prerequisite for the job framework itself, but it is a recommended precondition if `resample` is the first built-in process used to validate the async path end to end. Otherwise `#111` would be proving the framework through a process that still has known metadata bugs. + +### Separate ingestion/sync migration tickets + +Issue `#111` should build the shared job framework once. + +Separate tickets should: + +- implement ingestion as a first-class process on the native job framework +- implement sync as a first-class process on the native job framework + +That keeps the framework ticket focused while still making it responsible for the cross-cutting semantics those later migrations depend on. + +The shared framework part of that work is now in place; these migration tickets remain the next major consumers. + +## Current Repo Reality + +This section captures the pre-migration baseline that motivated the design. Process execution is no longer purely synchronous in the current branch state because `CLIM-703` added async job execution for native processes. + +### Process execution today + +`POST /processes/{id}/execution` supports both: + +- synchronous execution by default +- async execution when `Prefer: respond-async` is sent + +- route: `climate_api/processing/routes.py` +- built-in example: `climate_api.processing.services.execute_resample` + +### Ingestion today + +`POST /ingestions` is synchronous: + +- route: `climate_api/ingestions/routes.py` +- service: `climate_api/ingestions/services.py:create_artifact` + +### Persistence today + +- artifact records exist in `records.json` +- file locking already exists for artifact mutation +- there is no persisted job store yet + +## Scope Recommendation + +Issue `#111` should build the framework once, even if only `resample` is wired through it in the first implementation. + +That means: + +- do implement the shared job domain and runtime semantics in `#111` +- do not migrate ingestion or sync into first-class processes inside `#111` +- use `resample` as the first concrete built-in process that exercises the framework + +Recommended phases within and around this work: + +- Phase 1: persisted job store, `/jobs` endpoints, and async process execution +- Phase 2: cooperative cancellation and retry semantics in the framework +- Phase 3: restart recovery and resumability hooks + +Downstream tickets enabled by this work: + +- ingestion migration onto the framework +- sync migration onto the framework + +## Recommended MVP Boundary + +The first implementation slice should still be deliberate, but the framework design should not paint us into a corner. + +### In scope + +- `jobs.json` store under configured `data_dir` +- native job schemas and mutation helpers +- `GET /jobs` +- `GET /jobs/{job_id}` +- explicit deferral of `GET /jobs/{job_id}/results` for now, since `result` remains embedded in the job record during the first slice +- `POST /processes/{id}/execution` supports `Prefer: respond-async` +- sync fallback when `Prefer` is absent +- default in-process executor using `ThreadPoolExecutor` +- executor abstraction that allows later subprocess/custom/delegated runners +- persisted status transitions: + - `accepted` + - `running` + - `retrying` + - `successful` + - `failed` + - `cancelled` +- cooperative cancellation contract +- retry metadata and attempt tracking +- startup recovery policy for interrupted jobs + +### Out of scope for the first slice + +- ingestion-as-process migration +- `/sync` migration +- full OGC Parts 2/3 behavior +- sophisticated distributed workers or queue backends +- true resumable zarr build without `#64` + +## API Shape + +### Process execution + +`POST /processes/{id}/execution` + +- without `Prefer: respond-async` + - keep current synchronous behavior +- with `Prefer: respond-async` + - create job + - return `202 Accepted` + - return `Location: /jobs/{job_id}` + +### Job polling + +`GET /jobs/{job_id}` + +Suggested response model: + +```python +class JobRecord(BaseModel): + job_id: str = Field(serialization_alias="jobID") + process_id: str | None = Field(None, serialization_alias="processID") + type: Literal["process", "ingestion"] + status: Literal["accepted", "running", "retrying", "successful", "failed", "cancelled"] + attempt: int + max_attempts: int = Field(serialization_alias="maxAttempts") + created: datetime + started: datetime | None + finished: datetime | None + progress: JobProgress | None + request: dict[str, Any] + result: dict[str, Any] | None + error: str | None + cancel_requested: bool = Field(serialization_alias="cancelRequested") + retry_after: int | None = Field(None, serialization_alias="retryAfter") + cursor: dict[str, Any] | None + executor: str + + model_config = ConfigDict(populate_by_name=True) +``` + +With: + +```python +class JobProgress(BaseModel): + done: int | None + total: int | None + percent: float | None + message: str | None +``` + +Notes: + +- `jobID`, `processID`, `created`, `started`, and `finished` follow OGC naming +- `type`, `attempt`, `maxAttempts`, `request`, `progress`, `result`, `error`, `cancelRequested`, `retryAfter`, `cursor`, and `executor` are native extensions +- `processID` is naturally nullable for non-process jobs +- `cursor` is framework-owned storage for process-specific resume checkpoints +- internal model fields should stay snake_case; OGC-facing names should be produced through serialization aliases +- `cursor` is intended for lightweight, coarse-grained checkpoint state only; large resume metadata belongs in a sidecar file rather than in `jobs.json` +- `retryAfter` represents a delay in seconds, matching the HTTP `Retry-After` semantics rather than an absolute timestamp + +## Dispatcher and Executor Model + +Introduce one native dispatcher/runtime that: + +1. creates job record with `accepted` +2. submits worker task +3. marks `running` +4. executes target callable +5. stores `result` or `error` +6. marks final status + +For the MVP: + +- use `ThreadPoolExecutor` +- persist state to `jobs.json` +- do not block the HTTP request when async mode is requested + +This MVP has now been implemented. + +But design it around a small executor abstraction, not a hardcoded thread pool: + +```python +class ProcessExecutor(Protocol): + kind: str + def submit( + self, + job_id: str, + func: Callable[..., Any], + kwargs: dict[str, Any], + *, + max_retries: int = 0, + ) -> None: ... +``` + +This keeps the first implementation simple while leaving room for: + +- subprocess-backed execution +- plugin-provided execution +- delegated/distributed execution later + +`kind: str` is intentionally a lightweight executor identifier rather than a behavioral method. It is primarily there so the job record can capture what runner/executor was used. + +## Function Contract Guidance + +Do not force a large callable-contract refactor in the first `#111` implementation, but do converge toward one shared execution contract. + +Current process execution works via: + +```python +func(**request) +``` + +Recommendation: + +- keep current process callable shape working +- add optional progress plumbing without breaking the current contract +- add cooperative cancellation and resume hooks with safe no-op defaults +- avoid coupling the first job implementation to a large `ProcessContext` object unless it becomes necessary immediately + +Suggested target signature style: + +```python +def run_process( + *, + on_progress: ProgressCallback = lambda *_: None, + is_cancel_requested: Callable[[], bool] = lambda: False, + load_cursor: Callable[[], dict[str, Any] | None] = lambda: None, + save_cursor: Callable[[dict[str, Any]], None] = lambda _cursor: None, + **kwargs: Any, +) -> dict[str, Any]: + ... +``` + +This keeps direct local/test invocation simple while giving the job framework what it needs. + +## Ingestion and Process Sequencing + +Although the long-term target is one shared model for process execution, ingestion, and sync, the first implementation should start with **process jobs first**. + +Why: + +- `/processes` is already the cleanest native abstraction after `689` +- `resample` is a real built-in process to exercise the path +- ingestion migration is broader and touches more operational code paths + +Then bring ingestion onto the same job layer once the core job store, executor abstraction, and runtime semantics are proven. + +## Cancellation, Retry, and Resume + +These are framework concerns, but the framework should only provide **cooperative** semantics. + +The framework owns: + +- API surface +- state transitions +- persistence of retry/cancel/recovery metadata +- attempt counting and backoff +- restart recovery policy + +Individual processes own: + +- safe checkpoint boundaries +- what cursor state to save +- whether work is retryable and resumable in practice +- where cancellation checks happen + +### Cancellation + +Cancellation should not try to kill worker threads forcefully. + +Instead: + +- `DELETE /jobs/{job_id}` sets `cancelRequested = true` +- the running process checks `is_cancel_requested()` +- the process exits cleanly at a safe boundary +- the framework marks the job `cancelled` + +The job record should be retained after cancellation for auditability and later inspection. Cancellation is not the same as deleting/dismissing the record. + +This is important for download/build workflows where abrupt termination risks corrupt intermediates. + +### Retry + +Framework behavior: + +- bounded retry count +- exponential backoff, with the original `#111` issue values (`60s`, `120s`, `240s`) as the initial policy +- minimal retryable/non-retryable exception classification +- persisted `attempt` and `retryAfter` + +The current implementation now uses: + +- `retryAfter` as delay seconds +- `60s`, `120s`, `240s` retry delays + +Retry classification remains intentionally simple for now. + +### Resume + +Differentiate: + +- download resume +- Zarr build resume +- restart recovery of interrupted jobs + +True resumable Zarr build depends on `#64` and should not be implied by the first async-jobs PR. + +For framework restart recovery: + +- jobs persisted as `accepted`, `running`, or `retrying` should be examined on startup +- `accepted` is a distinct case: the job was created but may never have been picked up before process exit +- resumable jobs can be requeued +- non-resumable jobs can be marked failed with a restart-related error + +This is different from true workflow checkpointing, but it gives the framework a coherent recovery story. + +The framework-level restart recovery is now implemented. True meaningful resume for large ingest/build workflows still depends on `#64`. + +## Recommended Jira Ticket + +### Title + +Implement native async job framework for process execution + +### Summary + +Build the native job framework on top of the `CLIM-689` process surface. Add persisted jobs, async execution via `Prefer: respond-async`, job polling, framework-level cancellation/retry/recovery semantics, and a pluggable executor abstraction. Treat `CLIM-689` as already delivered; this ticket should not reopen process registry or `/processes` design. + +### Acceptance scope + +- persisted `jobs.json` store +- job schemas and mutation helpers +- `GET /jobs` +- `GET /jobs/{job_id}` +- `GET /jobs/{job_id}/results` explicitly deferred for the first slice while `result` remains embedded in `GET /jobs/{job_id}` +- `POST /processes/{id}/execution` supports async negotiation with `Prefer: respond-async` +- `202 Accepted` + `Location` header for async mode +- synchronous fallback remains supported +- `DELETE /jobs/{job_id}` for cooperative cancellation +- retry state and attempt tracking in the framework +- startup recovery policy for interrupted jobs +- executor abstraction with default in-process implementation +- built-in `resample` works through the async path + +This acceptance scope is now satisfied for the first slice. + +### Explicitly not in this ticket + +- ingestion migration +- `/sync` migration +- distributed worker backends +- full resumable zarr build without `#64` diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 00000000..79a6ad85 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from datetime import UTC, datetime + +from climate_api.ingestions.schemas import ( + ArtifactCoverage, + CoverageSpatial, + CoverageTemporal, + DatasetPublication, + DatasetRecord, + PublicationStatus, +) + + +def dataset_record(dataset_id: str) -> DatasetRecord: + return DatasetRecord( + dataset_id=dataset_id, + source_dataset_id="chirps3_precipitation_daily_w_mon_sum", + dataset_name="CHIRPS weekly precipitation", + short_name="CHIRPS weekly", + variable="precip", + period_type="daily", + units="mm", + resolution="5 km x 5 km", + source="CHIRPS v3", + source_url="https://example.com/chirps", + extent=ArtifactCoverage( + temporal=CoverageTemporal(start="2026-01-05", end="2026-01-11"), + spatial=CoverageSpatial(xmin=1.0, ymin=2.0, xmax=3.0, ymax=4.0), + ), + last_updated=datetime(2026, 1, 21, tzinfo=UTC), + links=[], + publication=DatasetPublication( + status=PublicationStatus.PUBLISHED, + published_at=datetime(2026, 1, 21, tzinfo=UTC), + ), + ) diff --git a/tests/test_jobs_api.py b/tests/test_jobs_api.py new file mode 100644 index 00000000..5e6cb398 --- /dev/null +++ b/tests/test_jobs_api.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import time +from collections.abc import Callable, Generator +from pathlib import Path +from typing import Any, cast + +import pytest +from fastapi.testclient import TestClient + +from climate_api.jobs import store as job_store +from climate_api.jobs.models import JobCancelledError, JobStatus +from climate_api.jobs.service import reset_job_service +from climate_api.processing import services as processing_services +from tests.helpers import dataset_record + + +@pytest.fixture(autouse=True) +def _temp_jobs_store(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]: + jobs_dir = tmp_path / "jobs" + monkeypatch.setattr(job_store, "JOBS_DIR", jobs_dir) + monkeypatch.setattr(job_store, "JOBS_INDEX_PATH", jobs_dir / "jobs.json") + reset_job_service() + yield + reset_job_service() + + +def _wait_for_terminal_job(client: TestClient, job_id: str, *, timeout: float = 3.0) -> dict[str, object]: + deadline = time.time() + timeout + while time.time() < deadline: + response = client.get(f"/jobs/{job_id}") + assert response.status_code == 200 + payload = response.json() + if payload["status"] in {"successful", "failed", "cancelled"}: + return payload + time.sleep(0.02) + raise AssertionError(f"Timed out waiting for job {job_id} to complete") + + +def test_async_process_execution_creates_job_and_exposes_status( + client: TestClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + processing_services, + "run_resample_process", + lambda **kwargs: ("artifact-123", dataset_record("chirps3_precipitation_daily_w_mon_sum")), + ) + + response = client.post( + "/processes/resample/execution", + headers={"Prefer": "respond-async"}, + json={ + "source_dataset_id": "chirps3_precipitation_daily", + "frequency": "W-MON", + "method": "sum", + "start": "2026-01-05", + "end": "2026-01-12", + "publish": True, + }, + ) + + assert response.status_code == 202 + payload = cast(dict[str, Any], response.json()) + job_id = payload["jobID"] + assert response.headers["Location"] == f"/jobs/{job_id}" + assert payload["processID"] == "resample" + + finished = _wait_for_terminal_job(client, job_id) + assert finished["status"] == JobStatus.SUCCESSFUL + result = cast(dict[str, Any], finished["result"]) + assert result["artifact_id"] == "artifact-123" + + jobs_response = client.get("/jobs") + assert jobs_response.status_code == 200 + jobs_payload = cast(dict[str, Any], jobs_response.json()) + assert any(item["jobID"] == job_id for item in jobs_payload["jobs"]) + + +def test_delete_job_requests_cooperative_cancellation( + client: TestClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + def fake_get_process(process_id: str) -> dict[str, object] | None: + if process_id != "slow-process": + return None + return { + "id": process_id, + "title": "Slow process", + "description": "Test cancellation", + "execution": {"function": "tests.fake.slow_process"}, + "expose": True, + "jobControlOptions": ["sync-execute", "async-execute"], + } + + def slow_process( + *, + on_progress: Callable[[int | None, int | None, str | None], None] | None = None, + is_cancel_requested: Callable[[], bool] | None = None, + **_kwargs: object, + ) -> dict[str, object]: + for step in range(20): + if is_cancel_requested is not None and is_cancel_requested(): + raise JobCancelledError() + if on_progress is not None: + on_progress(step, 20, "Working") + time.sleep(0.01) + return {"ok": True} + + monkeypatch.setattr("climate_api.processing.routes.process_registry.get_process", fake_get_process) + monkeypatch.setattr("climate_api.jobs.service.process_registry.get_process", fake_get_process) + monkeypatch.setattr( + "climate_api.data_registry.services.processes._get_dynamic_function", + lambda path: slow_process, + ) + + response = client.post("/processes/slow-process/execution", headers={"Prefer": "respond-async"}, json={}) + assert response.status_code == 202 + job_id = response.json()["jobID"] + + cancel_response = client.delete(f"/jobs/{job_id}") + assert cancel_response.status_code == 200 + assert cancel_response.json()["cancelRequested"] is True + + finished = _wait_for_terminal_job(client, job_id) + assert finished["status"] == JobStatus.CANCELLED + + +def test_delete_unknown_job_returns_404(client: TestClient) -> None: + response = client.delete("/jobs/missing-job") + + assert response.status_code == 404 + + +def test_delete_terminal_job_leaves_record_unchanged( + client: TestClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + processing_services, + "run_resample_process", + lambda **kwargs: ("artifact-123", dataset_record("chirps3_precipitation_daily_w_mon_sum")), + ) + + response = client.post( + "/processes/resample/execution", + headers={"Prefer": "respond-async"}, + json={ + "source_dataset_id": "chirps3_precipitation_daily", + "frequency": "W-MON", + "method": "sum", + "start": "2026-01-05", + }, + ) + assert response.status_code == 202 + job_id = cast(dict[str, Any], response.json())["jobID"] + + finished = _wait_for_terminal_job(client, job_id) + assert finished["status"] == JobStatus.SUCCESSFUL + + cancel_response = client.delete(f"/jobs/{job_id}") + assert cancel_response.status_code == 200 + payload = cast(dict[str, Any], cancel_response.json()) + assert payload["status"] == JobStatus.SUCCESSFUL + assert payload["cancelRequested"] is False + + +def test_async_submission_rejects_invalid_resample_input(client: TestClient) -> None: + response = client.post( + "/processes/resample/execution", + headers={"Prefer": "respond-async"}, + json={ + "source_dataset_id": "chirps3_precipitation_daily", + "frequency": "invalid", + "method": "sum", + "start": "2026-01-05", + }, + ) + + assert response.status_code == 400 + jobs_response = client.get("/jobs") + assert jobs_response.status_code == 200 + jobs_payload = cast(dict[str, Any], jobs_response.json()) + assert jobs_payload["jobs"] == [] diff --git a/tests/test_jobs_service.py b/tests/test_jobs_service.py new file mode 100644 index 00000000..fc9dee75 --- /dev/null +++ b/tests/test_jobs_service.py @@ -0,0 +1,394 @@ +from __future__ import annotations + +import time +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, cast + +import pytest + +import climate_api.data_registry.services.processes as registry_processes +import climate_api.jobs.service as jobs_service +from climate_api.jobs import store as job_store +from climate_api.jobs.models import JobRecord, JobStatus +from climate_api.jobs.service import JobService, ProcessExecutor, ThreadProcessExecutor, _retry_delay_seconds + + +@pytest.fixture(autouse=True) +def _temp_jobs_store(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + jobs_dir = tmp_path / "jobs" + monkeypatch.setattr(job_store, "JOBS_DIR", jobs_dir) + monkeypatch.setattr(job_store, "JOBS_INDEX_PATH", jobs_dir / "jobs.json") + + +def _job_record(job_id: str = "job-1", *, status: JobStatus = JobStatus.ACCEPTED) -> JobRecord: + return JobRecord( + job_id=job_id, + process_id="recoverable-process", + status=status, + created_at=datetime(2026, 5, 18, tzinfo=UTC), + request={"value": 2}, + links=[], + ) + + +def test_store_create_get_and_list_round_trip() -> None: + created = job_store.create_job_record(_job_record()) + + fetched = job_store.get_job_record(created.job_id) + listed = job_store.list_job_records() + + assert fetched is not None + assert fetched.job_id == created.job_id + assert [record.job_id for record in listed] == [created.job_id] + + +def test_store_upsert_replaces_existing_record() -> None: + job_store.create_job_record(_job_record()) + + updated = _job_record(status=JobStatus.RUNNING).model_copy(update={"attempt": 1}) + job_store.upsert_job_record(updated) + + fetched = job_store.get_job_record(updated.job_id) + assert fetched is not None + assert fetched.status == JobStatus.RUNNING + assert fetched.attempt == 1 + + +def test_store_mutate_missing_job_raises_key_error() -> None: + with pytest.raises(KeyError): + job_store.mutate_job_record("missing", lambda current: current) + + +def test_retry_delay_seconds_matches_design_schedule() -> None: + assert _retry_delay_seconds(1) == 60 + assert _retry_delay_seconds(2) == 120 + assert _retry_delay_seconds(3) == 240 + assert _retry_delay_seconds(4) == 240 + + +def test_service_save_cursor_persists_checkpoint() -> None: + service = JobService() + try: + job_store.create_job_record(_job_record()) + + service.save_cursor("job-1", {"last_period": "2024-03"}) + + fetched = job_store.get_job_record("job-1") + assert fetched is not None + assert fetched.cursor == {"last_period": "2024-03"} + finally: + service.shutdown() + + +def test_service_update_progress_persists_percent() -> None: + service = JobService() + try: + job_store.create_job_record(_job_record()) + + service.update_progress("job-1", done=1, total=4, message="Working") + + fetched = job_store.get_job_record("job-1") + assert fetched is not None + assert fetched.progress.done == 1 + assert fetched.progress.total == 4 + assert fetched.progress.percent == 25.0 + assert fetched.progress.message == "Working" + finally: + service.shutdown() + + +def test_list_jobs_orders_by_created_at_descending() -> None: + service = JobService() + try: + older = _job_record("job-older").model_copy(update={"created_at": datetime(2026, 5, 17, tzinfo=UTC)}) + newer = _job_record("job-newer").model_copy(update={"created_at": datetime(2026, 5, 18, tzinfo=UTC)}) + job_store.create_job_record(older) + job_store.create_job_record(newer) + + response = service.list_jobs() + + assert [job.job_id for job in response.jobs] == ["job-newer", "job-older"] + finally: + service.shutdown() + + +def test_service_uses_executor_kind_on_created_jobs(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeExecutor: + kind = "fake-thread" + + def __init__(self) -> None: + self._delegate = ThreadProcessExecutor(max_workers=1) + + def submit(self, fn: Any, /, *args: Any, **kwargs: Any): + return self._delegate.submit(fn, *args, **kwargs) + + def shutdown(self) -> None: + self._delegate.shutdown() + + service = JobService(executor=cast(ProcessExecutor, FakeExecutor())) + try: + + def fake_get_process(process_id: str) -> dict[str, object] | None: + if process_id != "recoverable-process": + return None + return { + "id": process_id, + "title": "Recoverable process", + "execution": {"function": "tests.fake.recoverable"}, + "expose": True, + "jobControlOptions": ["sync-execute", "async-execute"], + } + + monkeypatch.setattr(jobs_service.process_registry, "get_process", fake_get_process) + monkeypatch.setattr( + registry_processes, + "_get_dynamic_function", + lambda path: lambda *, value: {"value": value}, + ) + + created = service.submit_process_job(process_id="recoverable-process", request={"value": 2}) + assert created.executor_kind == "fake-thread" + finally: + service.shutdown() + + +def test_enqueue_job_does_not_submit_duplicate_running_future() -> None: + class BlockingFuture: + def done(self) -> bool: + return False + + class RecordingExecutor: + kind = "recording" + + def __init__(self) -> None: + self.submissions = 0 + + def submit(self, fn: Any, /, *args: Any, **kwargs: Any) -> BlockingFuture: + self.submissions += 1 + return BlockingFuture() + + def shutdown(self) -> None: + return None + + executor = RecordingExecutor() + service = JobService(executor=cast(ProcessExecutor, executor)) + try: + service._enqueue_job("job-1") + service._enqueue_job("job-1") + + assert executor.submissions == 1 + finally: + service.shutdown() + + +def test_recover_pending_jobs_requeues_accepted_process(monkeypatch: pytest.MonkeyPatch) -> None: + service = JobService(max_workers=1) + try: + job_store.create_job_record(_job_record()) + + def fake_get_process(process_id: str) -> dict[str, object] | None: + if process_id != "recoverable-process": + return None + return { + "id": process_id, + "title": "Recoverable process", + "execution": {"function": "tests.fake.recoverable"}, + "expose": True, + "jobControlOptions": ["sync-execute", "async-execute"], + } + + monkeypatch.setattr("climate_api.jobs.service.process_registry.get_process", fake_get_process) + monkeypatch.setattr( + "climate_api.data_registry.services.processes._get_dynamic_function", + lambda path: lambda *, value: {"value": value * 2}, + ) + + service.recover_pending_jobs() + + for _ in range(100): + record = job_store.get_job_record("job-1") + if record is not None and record.status == JobStatus.SUCCESSFUL: + break + time.sleep(0.01) + else: + raise AssertionError("Recovered job did not complete") + + record = job_store.get_job_record("job-1") + assert record is not None + assert record.status == JobStatus.SUCCESSFUL + assert record.result == {"value": 4} + finally: + service.shutdown() + + +def test_recover_pending_jobs_requeues_retrying_process(monkeypatch: pytest.MonkeyPatch) -> None: + service = JobService(max_workers=1) + try: + job_store.create_job_record(_job_record(status=JobStatus.RETRYING)) + + def fake_get_process(process_id: str) -> dict[str, object] | None: + if process_id != "recoverable-process": + return None + return { + "id": process_id, + "title": "Recoverable process", + "execution": {"function": "tests.fake.recoverable"}, + "expose": True, + "jobControlOptions": ["sync-execute", "async-execute"], + } + + monkeypatch.setattr("climate_api.jobs.service.process_registry.get_process", fake_get_process) + monkeypatch.setattr( + "climate_api.data_registry.services.processes._get_dynamic_function", + lambda path: lambda *, value: {"value": value * 3}, + ) + + service.recover_pending_jobs() + + for _ in range(100): + record = job_store.get_job_record("job-1") + if record is not None and record.status == JobStatus.SUCCESSFUL: + break + time.sleep(0.01) + else: + raise AssertionError("Recovered retrying job did not complete") + + record = job_store.get_job_record("job-1") + assert record is not None + assert record.status == JobStatus.SUCCESSFUL + assert record.result == {"value": 6} + finally: + service.shutdown() + + +def test_recover_pending_jobs_requeues_running_process_without_burning_attempt( + monkeypatch: pytest.MonkeyPatch, +) -> None: + service = JobService(max_workers=1) + try: + job_store.create_job_record(_job_record(status=JobStatus.RUNNING).model_copy(update={"attempt": 1})) + + def fake_get_process(process_id: str) -> dict[str, object] | None: + if process_id != "recoverable-process": + return None + return { + "id": process_id, + "title": "Recoverable process", + "execution": {"function": "tests.fake.recoverable"}, + "expose": True, + "jobControlOptions": ["sync-execute", "async-execute"], + } + + monkeypatch.setattr("climate_api.jobs.service.process_registry.get_process", fake_get_process) + monkeypatch.setattr( + "climate_api.data_registry.services.processes._get_dynamic_function", + lambda path: lambda *, value: {"value": value * 5}, + ) + + service.recover_pending_jobs() + + for _ in range(100): + record = job_store.get_job_record("job-1") + if record is not None and record.status == JobStatus.SUCCESSFUL: + break + time.sleep(0.01) + else: + raise AssertionError("Recovered running job did not complete") + + record = job_store.get_job_record("job-1") + assert record is not None + assert record.status == JobStatus.SUCCESSFUL + assert record.result == {"value": 10} + assert record.attempt == 1 + finally: + service.shutdown() + + +def test_retrying_job_exhausts_attempts_and_fails(monkeypatch: pytest.MonkeyPatch) -> None: + service = JobService(max_workers=1) + try: + + def fake_get_process(process_id: str) -> dict[str, object] | None: + if process_id != "recoverable-process": + return None + return { + "id": process_id, + "title": "Recoverable process", + "execution": {"function": "tests.fake.failing"}, + "expose": True, + "jobControlOptions": ["sync-execute", "async-execute"], + } + + monkeypatch.setattr("climate_api.jobs.service.process_registry.get_process", fake_get_process) + monkeypatch.setattr( + "climate_api.data_registry.services.processes._get_dynamic_function", + lambda path: lambda *, value: (_ for _ in ()).throw(RuntimeError(f"boom:{value}")), + ) + monkeypatch.setattr(JobService, "_sleep_for_retry", lambda self, job_id, seconds: True) + + created = service.submit_process_job(process_id="recoverable-process", request={"value": 2}, max_attempts=2) + + for _ in range(100): + record = job_store.get_job_record(created.job_id) + if record is not None and record.status == JobStatus.FAILED: + break + time.sleep(0.01) + else: + raise AssertionError("Retrying job did not fail in time") + + record = job_store.get_job_record(created.job_id) + assert record is not None + assert record.status == JobStatus.FAILED + assert record.attempt == 2 + assert record.error is not None + assert record.error.type == "RuntimeError" + finally: + service.shutdown() + + +def test_retry_wait_honors_cancellation_request(monkeypatch: pytest.MonkeyPatch) -> None: + service = JobService(max_workers=1) + try: + + def fake_get_process(process_id: str) -> dict[str, object] | None: + if process_id != "recoverable-process": + return None + return { + "id": process_id, + "title": "Recoverable process", + "execution": {"function": "tests.fake.failing"}, + "expose": True, + "jobControlOptions": ["sync-execute", "async-execute"], + } + + monkeypatch.setattr("climate_api.jobs.service.process_registry.get_process", fake_get_process) + monkeypatch.setattr( + "climate_api.data_registry.services.processes._get_dynamic_function", + lambda path: lambda *, value: (_ for _ in ()).throw(RuntimeError(f"boom:{value}")), + ) + monkeypatch.setattr( + JobService, + "_sleep_for_retry", + lambda self, job_id, seconds: ( + service.request_cancellation(job_id), + False, + )[1], + ) + + created = service.submit_process_job(process_id="recoverable-process", request={"value": 2}, max_attempts=2) + + for _ in range(100): + record = job_store.get_job_record(created.job_id) + if record is not None and record.status == JobStatus.CANCELLED: + break + time.sleep(0.01) + else: + raise AssertionError("Retrying job did not cancel in time") + + record = job_store.get_job_record(created.job_id) + assert record is not None + assert record.status == JobStatus.CANCELLED + assert record.progress.message == "Cancelled before retry execution resumed" + finally: + service.shutdown() diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 00000000..b8982ad8 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import pytest +from fastapi import FastAPI + +import climate_api.main as main + + +@pytest.mark.anyio +async def test_lifespan_recovers_jobs_and_shuts_down(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[str] = [] + + class FakeJobService: + def recover_pending_jobs(self) -> None: + calls.append("recover") + + def shutdown(self) -> None: + calls.append("shutdown") + + monkeypatch.setattr(main, "get_job_service", lambda: FakeJobService()) + + async with main._lifespan(FastAPI()): + assert calls == ["recover"] + + assert calls == ["recover", "shutdown"] diff --git a/tests/test_process_registry.py b/tests/test_process_registry.py index f909751b..f1e62397 100644 --- a/tests/test_process_registry.py +++ b/tests/test_process_registry.py @@ -21,7 +21,7 @@ def test_builtin_resample_has_execution_function(monkeypatch: pytest.MonkeyPatch assert resample is not None assert resample["title"] == "Temporal resampling" assert resample["execution"]["function"] == "climate_api.processing.services.execute_resample" - assert resample["jobControlOptions"] == ["sync-execute"] + assert resample["jobControlOptions"] == ["sync-execute", "async-execute"] def test_get_process_returns_none_for_unknown_id(monkeypatch: pytest.MonkeyPatch) -> None: @@ -31,6 +31,24 @@ def test_get_process_returns_none_for_unknown_id(monkeypatch: pytest.MonkeyPatch assert process_registry.get_process("does_not_exist") is None +def test_get_process_function_returns_callable_for_registered_process(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + process_registry, + "get_process", + lambda process_id: { + "id": process_id, + "title": "Callable process", + "execution": {"function": "mypackage.execute.run"}, + "expose": True, + }, + ) + monkeypatch.setattr(process_registry, "_get_dynamic_function", lambda path: {"path": path}) + + resolved = process_registry.get_process_function("callable-process") + + assert resolved == {"path": "mypackage.execute.run"} + + def test_process_registry_rejects_missing_title(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: processes_subdir = tmp_path / "processes" processes_subdir.mkdir() diff --git a/tests/test_processing_routes.py b/tests/test_processing_routes.py index 587a1987..25262c6c 100644 --- a/tests/test_processing_routes.py +++ b/tests/test_processing_routes.py @@ -1,44 +1,12 @@ -from datetime import UTC, datetime from pathlib import Path from typing import Any import pytest from fastapi.testclient import TestClient -from climate_api.ingestions.schemas import ( - ArtifactCoverage, - CoverageSpatial, - CoverageTemporal, - DatasetPublication, - DatasetRecord, - PublicationStatus, -) +from climate_api.ingestions.schemas import DatasetRecord from climate_api.processing import services as processing_services - - -def _dataset_record(dataset_id: str) -> DatasetRecord: - return DatasetRecord( - dataset_id=dataset_id, - source_dataset_id="chirps3_precipitation_daily_w_mon_sum", - dataset_name="CHIRPS weekly precipitation", - short_name="CHIRPS weekly", - variable="precip", - period_type="daily", - units="mm", - resolution="5 km x 5 km", - source="CHIRPS v3", - source_url="https://example.com/chirps", - extent=ArtifactCoverage( - temporal=CoverageTemporal(start="2026-01-05", end="2026-01-11"), - spatial=CoverageSpatial(xmin=1.0, ymin=2.0, xmax=3.0, ymax=4.0), - ), - last_updated=datetime(2026, 1, 21, tzinfo=UTC), - links=[], - publication=DatasetPublication( - status=PublicationStatus.PUBLISHED, - published_at=datetime(2026, 1, 21, tzinfo=UTC), - ), - ) +from tests.helpers import dataset_record def test_get_processes_lists_registered_processes(client: TestClient) -> None: @@ -57,7 +25,7 @@ def test_get_process_detail_returns_public_metadata(client: TestClient) -> None: payload = response.json() assert payload["id"] == "resample" assert payload["title"] == "Temporal resampling" - assert payload["jobControlOptions"] == ["sync-execute"] + assert payload["jobControlOptions"] == ["sync-execute", "async-execute"] assert "execution_function" not in payload assert payload["inputs"]["source_dataset_id"]["required"] is True assert payload["inputs"]["publish"]["default"] is True @@ -155,7 +123,7 @@ def test_post_resample_execution_returns_completed_response( monkeypatch.setattr( processing_services, "run_resample_process", - lambda **kwargs: ("artifact-123", _dataset_record("chirps3_precipitation_daily_w_mon_sum")), + lambda **kwargs: ("artifact-123", dataset_record("chirps3_precipitation_daily_w_mon_sum")), ) response = client.post( @@ -177,6 +145,60 @@ def test_post_resample_execution_returns_completed_response( assert payload["dataset"]["dataset_id"] == "chirps3_precipitation_daily_w_mon_sum" +def test_post_process_execution_honors_case_insensitive_prefer_tokens( + client: TestClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + processing_services, + "run_resample_process", + lambda **kwargs: ("artifact-123", dataset_record("chirps3_precipitation_daily_w_mon_sum")), + ) + + response = client.post( + "/processes/resample/execution", + headers={"Prefer": "Respond-Async, wait=10"}, + json={ + "source_dataset_id": "chirps3_precipitation_daily", + "frequency": "W-MON", + "method": "sum", + "start": "2026-01-05", + "end": "2026-01-12", + "publish": True, + }, + ) + + assert response.status_code == 202 + payload = response.json() + assert payload["status"] in {"accepted", "running", "successful"} + assert response.headers["Location"].startswith("/jobs/") + + +def test_post_process_execution_rejects_async_when_process_is_sync_only( + client: TestClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "climate_api.processing.routes.process_registry.get_process", + lambda process_id: { + "id": process_id, + "title": "Sync only process", + "execution": {"function": "mypackage.sync_only.execute"}, + "expose": True, + "jobControlOptions": ["sync-execute"], + }, + ) + + response = client.post( + "/processes/sync-only/execution", + headers={"Prefer": "respond-async"}, + json={}, + ) + + assert response.status_code == 409 + assert response.json()["detail"] == "Process 'sync-only' does not support async execution" + + def test_post_internal_process_execution_returns_404(client: TestClient, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr( "climate_api.processing.routes.process_registry.get_process", @@ -202,7 +224,7 @@ def test_post_resample_execution_passes_params_to_service( def fake_run_resample_process(**kwargs: object) -> tuple[str, DatasetRecord]: captured.update(kwargs) - return "artifact-456", _dataset_record("chirps3_precipitation_daily_w_mon_sum") + return "artifact-456", dataset_record("chirps3_precipitation_daily_w_mon_sum") monkeypatch.setattr(processing_services, "run_resample_process", fake_run_resample_process) @@ -245,6 +267,21 @@ def test_post_resample_execution_returns_400_for_invalid_frequency( assert response.status_code == 400 +def test_post_resample_execution_returns_400_for_null_frequency(client: TestClient) -> None: + response = client.post( + "/processes/resample/execution", + json={ + "source_dataset_id": "chirps3_precipitation_daily", + "frequency": None, + "method": "sum", + "start": "2026-01-01", + }, + ) + + assert response.status_code == 400 + assert response.json()["detail"] == "frequency is required" + + def test_post_resample_execution_returns_400_for_unsupported_method( client: TestClient, monkeypatch: pytest.MonkeyPatch,