Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions climate_api/data/processes/resample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- resampling
jobControlOptions:
- sync-execute
- async-execute
inputs:
source_dataset_id:
type: string
Expand Down
8 changes: 8 additions & 0 deletions climate_api/data_registry/services/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ def get_process(process_id: str) -> dict[str, Any] | None:
return {p["id"]: p for p in list_processes()}.get(process_id)


def get_process_function(process_id: str) -> Any:
"""Import and return the execution callable for one registered process id."""
process = get_process(process_id)
if process is None:
raise ValueError(f"Unknown process '{process_id}'")
return _get_dynamic_function(process["execution"]["function"])


def _load_builtin_processes() -> list[dict[str, Any]]:
"""Load built-in process definitions from package data via importlib.resources."""
pkg = importlib.resources.files("climate_api") / "data" / "processes"
Expand Down
1 change: 1 addition & 0 deletions climate_api/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Native job execution framework for process runs."""
94 changes: 94 additions & 0 deletions climate_api/jobs/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Pydantic schemas and execution primitives for native jobs."""

from __future__ import annotations

from datetime import datetime
from enum import StrEnum
from typing import Any

from pydantic import BaseModel, ConfigDict, Field


class JobCancelledError(Exception):
"""Raised by a job implementation when cooperative cancellation is honored."""


class JobStatus(StrEnum):
"""Persisted lifecycle states for native jobs."""

ACCEPTED = "accepted"
RUNNING = "running"
RETRYING = "retrying"
SUCCESSFUL = "successful"
FAILED = "failed"
CANCELLED = "cancelled"


class JobType(StrEnum):
"""Native job categories."""

PROCESS = "process"


class JobLink(BaseModel):
"""Hypermedia link for a job resource."""

href: str
rel: str
title: str | None = None


class JobProgress(BaseModel):
"""Progress details reported by a running job."""

done: int | None = None
total: int | None = None
percent: float | None = None
message: str | None = None


class JobError(BaseModel):
"""Serialized error details for a failed job."""

type: str
message: str


class JobRecord(BaseModel):
"""Persisted native job record."""

model_config = ConfigDict(populate_by_name=True)

job_id: str = Field(validation_alias="jobID", serialization_alias="jobID")
process_id: str = Field(validation_alias="processID", serialization_alias="processID")
type: JobType = JobType.PROCESS
status: JobStatus
created_at: datetime = Field(validation_alias="createdAt", serialization_alias="createdAt")
started_at: datetime | None = Field(default=None, validation_alias="startedAt", serialization_alias="startedAt")
finished_at: datetime | None = Field(default=None, validation_alias="finishedAt", serialization_alias="finishedAt")
attempt: int = 0
max_attempts: int = Field(default=1, validation_alias="maxAttempts", serialization_alias="maxAttempts")
executor_kind: str = Field(default="thread", validation_alias="executorKind", serialization_alias="executorKind")
request: dict[str, Any] = Field(default_factory=dict)
progress: JobProgress = Field(default_factory=JobProgress)
result: Any | None = None
error: JobError | None = None
cancel_requested: bool = Field(
default=False,
validation_alias="cancelRequested",
serialization_alias="cancelRequested",
)
retry_after: int | None = Field(
default=None,
validation_alias="retryAfter",
serialization_alias="retryAfter",
)
cursor: dict[str, Any] | None = None
links: list[JobLink] = Field(default_factory=list)


class JobListResponse(BaseModel):
"""Envelope response for native jobs."""

jobs: list[JobRecord] = Field(default_factory=list)
links: list[JobLink] = Field(default_factory=list)
26 changes: 26 additions & 0 deletions climate_api/jobs/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""HTTP routes for native job status and cancellation."""

from fastapi import APIRouter

from climate_api.jobs.models import JobListResponse, JobRecord
from climate_api.jobs.service import get_job_service

router = APIRouter()


@router.get("", response_model=JobListResponse)
def list_jobs() -> JobListResponse:
"""Return all persisted native jobs."""
return get_job_service().list_jobs()


@router.get("/{job_id}", response_model=JobRecord)
def get_job(job_id: str) -> JobRecord:
"""Return one native job."""
return get_job_service().get_job_or_404(job_id)


@router.delete("/{job_id}", response_model=JobRecord)
def cancel_job(job_id: str) -> JobRecord:
"""Request cooperative cancellation for one native job."""
return get_job_service().request_cancellation(job_id)
Loading
Loading