Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a native, persisted async job framework for OGC-style process execution in the Climate API, enabling Prefer: respond-async negotiation for /processes/{id}/execution and wiring the existing resample process through the new job runtime.
Changes:
- Added a JSON-backed persisted job store (
jobs.json), job models, runtime service (thread-backed executor), and/jobslist/detail/cancel endpoints. - Implemented async negotiation for
POST /processes/{id}/executionviaPrefer: respond-asyncreturning202 Accepted+Location: /jobs/{job_id}, with synchronous fallback when absent. - Added runtime features: cooperative cancellation, retry metadata/backoff, and startup recovery that requeues pending jobs; updated tests and internal design notes accordingly.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_processing_routes.py | Updates process tests for async jobControlOptions and shared dataset fixture. |
| tests/test_process_registry.py | Adjusts registry assertions to expect async-execute support. |
| tests/test_jobs_service.py | Adds unit tests for job store/service, retry delays, and recovery behavior. |
| tests/test_jobs_api.py | Adds API tests for async submission, polling, and cooperative cancellation. |
| tests/helpers.py | Introduces shared dataset_record() test helper. |
| docs/internal/issue-111-async-jobs-design.md | Adds internal design note aligning issue #111 with current implementation. |
| docs/internal/clim-703-implementation-review.md | Adds internal implementation review notes and validation status. |
| climate_api/processing/services.py | Splits resample request validation and adds job-runtime hooks (progress/cancel/cursor) to execution. |
| climate_api/processing/routes.py | Adds Prefer-based async negotiation and request validation before dispatch. |
| climate_api/main.py | Registers /jobs router and adds lifespan hook for startup recovery. |
| climate_api/jobs/store.py | Implements portalocker-protected JSON persistence for job records. |
| climate_api/jobs/service.py | Implements JobService runtime, executor abstraction, retry/cancel/recovery, and process invocation plumbing. |
| climate_api/jobs/routes.py | Adds /jobs list/detail/cancel HTTP routes. |
| climate_api/jobs/models.py | Adds Pydantic job schemas (record/progress/error/status) and cancellation primitive. |
| climate_api/jobs/init.py | Introduces jobs package marker. |
| climate_api/data/processes/resample.yaml | Declares async-execute support for resample via jobControlOptions. |
| request: dict[str, Any] = Body(...), | ||
| prefer: str | None = Header(default=None), | ||
| ) -> Any: | ||
| """Dispatch to a registered process execution function by process id.""" | ||
| process = _get_public_process_or_404(process_id) | ||
| _validate_process_request(process, request) | ||
| if prefer is not None and "respond-async" in prefer: | ||
| job = get_job_service().submit_process_job(process_id=process_id, request=request) | ||
| response.status_code = 202 | ||
| response.headers["Location"] = f"/jobs/{job.job_id}" | ||
| return job |
| get_job_service().recover_pending_jobs() | ||
| yield |
| logger.exception("Job %s failed", job_id) | ||
| error = JobError(type=type(exc).__name__, message=str(exc)) | ||
| if started.attempt < started.max_attempts: | ||
| retry_after = _retry_delay_seconds(started.attempt) | ||
| store.mutate_job_record( | ||
| job_id, | ||
| lambda latest: latest.model_copy( | ||
| update={ | ||
| "status": JobStatus.RETRYING, | ||
| "retry_after": retry_after, | ||
| "error": error, | ||
| "progress": JobProgress(message="Retry scheduled"), | ||
| } | ||
| ), | ||
| ) | ||
| time.sleep(retry_after) | ||
| continue |
turban
left a comment
There was a problem hiding this comment.
Review
CI passing. Good coverage for a new subsystem. Two bugs should be fixed before merge; the rest can be follow-ups.
Bugs
1. Store key mismatch — store.py:594, 603, 631
JobRecord serializes with camelCase aliases ("jobID"), but the lookup code uses the Python field name:
if raw.get("job_id") == job_id: # never matches — disk has "jobID"Every live lookup into the persisted store silently returns None. Fix: serialize with model_dump(by_alias=False), or change all raw.get(…) calls to use "jobID". list_job_records works because it goes through model_validate, but get_job_record and mutate_job_record are broken after any round-trip through disk.
2. Lambda cell-var capture in loops — service.py:350, 479–490
Lambdas inside while True and for record in … loops close over loop variables by reference. By the time the lambda executes inside mutate_job_record, the variable may have been rebound to a later iteration's value. The recovery path could persist the wrong job_id; the retry mutation could store the wrong error or delay.
Medium
3. Prefer header parsed case-sensitively — processing/routes.py:131
RFC 7240 requires case-insensitive matching and comma-separated directives. "Respond-Async" silently falls through to sync execution. Suggest:
"respond-async" in [t.strip().lower() for t in (prefer or "").split(",")]4. Startup recovery increments attempt count before work runs — service.py:344–361
A job interrupted mid-run recovers with status=RUNNING, attempt=N. The loop bumps it to N+1 before any work starts. With max_attempts=1 a single server restart exhausts the retry budget. The recovered job should reset to ACCEPTED or at least not count the restart as a failed attempt.
5. _invoke_process calls a private function across module boundary — service.py:515
func = process_registry._get_dynamic_function(process["execution"]["function"])This couples the job service to a private implementation detail of the registry. Worth exposing as a public get_process_function(process_id) helper.
6. Per-process validation hardcoded by ID in the route — processing/routes.py:769
if process.get("id") == "resample":
processing_services.validate_resample_request(…)This will grow into a switch statement as processes are added. Validation should live in the process descriptor or callable, not the generic route handler.
Low / follow-up
time.sleep(retry_after)blocks a thread-pool worker for up to 240s. Withmax_workers=4and three jobs retrying, the pool stalls. The internal review doc flags this as a follow-up, but it is a correctness risk at moderate load.JOBS_DIRandJOBS_INDEX_PATHresolved at module import time makes test monkeypatching fragile — any code that captures them before patching runs sees the wrong path._ = load_cursor, save_cursorinprocessing/services.py:880— use# noqa: F841or drop the assignment.- Internal design doc contains "ready for PR" language that will be stale after merge.
Test gaps
- No test for the
RUNNINGrecovery path (onlyACCEPTEDandRETRYINGare covered). - No test for the concurrent-enqueue guard in
_enqueue_job. - No test for
list_jobsordering (created_at descending).
| try: | ||
| _offset = pd.tseries.frequencies.to_offset(frequency) | ||
| except ValueError: | ||
| _offset = None | ||
| if _offset is None: |
| process = _get_public_process_or_404(process_id) | ||
| _validate_process_request(process, request) | ||
| if _prefer_respond_async(prefer): | ||
| job = get_job_service().submit_process_job(process_id=process_id, request=request) | ||
| response.status_code = 202 | ||
| response.headers["Location"] = f"/jobs/{job.job_id}" | ||
| return job |
| def _validate_process_request(process: dict[str, Any], request: dict[str, Any]) -> None: | ||
| _validate_required_process_inputs(process, request) | ||
| if process.get("id") == "resample": | ||
| processing_services.validate_resample_request(**request) | ||
|
|
| def _validate_required_process_inputs(process: dict[str, Any], request: dict[str, Any]) -> None: | ||
| raw_inputs = process.get("inputs") | ||
| if not isinstance(raw_inputs, dict): | ||
| return | ||
| missing = [ | ||
| key | ||
| for key, value in raw_inputs.items() | ||
| if isinstance(key, str) and isinstance(value, dict) and value.get("required") is True and key not in request | ||
| ] | ||
| if missing: | ||
| joined = ", ".join(sorted(missing)) | ||
| raise HTTPException(status_code=400, detail=f"Missing required process inputs: {joined}") |
| if not frequency: | ||
| raise HTTPException(status_code=400, detail="frequency is required") | ||
| if not start: | ||
| raise HTTPException(status_code=400, detail="start is required") |
| if record.status == JobStatus.RUNNING: | ||
| store.mutate_job_record( | ||
| record.job_id, | ||
| lambda current: current.model_copy( | ||
| update={ | ||
| "status": JobStatus.ACCEPTED, | ||
| "attempt": max(0, current.attempt - 1), | ||
| "finished_at": None, | ||
| "retry_after": None, | ||
| "error": None, | ||
| "progress": JobProgress(message="Requeued after restart during execution"), | ||
| } | ||
| ), |
Summary
Implements the first native async job-framework slice for process execution.
This adds persisted jobs, async execution negotiation for native processes, cooperative cancellation, retry and startup recovery, and wires
resamplethrough the framework as the first concrete process.Changes
jobs.jsonstore/jobslist/detail/cancel endpointsPOST /processes/{id}/executionPrefer: respond-async202 Accepted+LocationPreferis absentretryAfterresamplethrough the native job framework#111/CLIM-703Out of scope
#64