Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
7c16f8a
feat: per-period Icechunk ingest — protocol, orchestrator, ERA5-Land …
turban May 19, 2026
28337be
fix: per-period commit and hourly period filter in ERA5-Land plugin
turban May 19, 2026
1285d46
fix: strip zarr v2 Blosc encoding before writing to icechunk store
turban May 19, 2026
8855cfa
fix: pin time encoding to hours epoch so hourly appends land on corre…
turban May 19, 2026
67d1ccf
feat: store-based sync for Icechunk — use read_committed_period_ids a…
turban May 19, 2026
438a2fc
feat: rechunk Icechunk store in-place after initial ingest (time: 1 →…
turban May 19, 2026
fe1bb5f
feat: CHIRPS3 and WorldPop IngestionPlugin implementations
turban May 19, 2026
a841eb7
feat: Icechunk support in data accessor, zarr-serving, STAC and resample
turban May 19, 2026
bd713c7
feat: GridSpec.extra_dims and expire_snapshots after ingest
turban May 19, 2026
3cdba2c
docs: update for IngestionPlugin, Icechunk ingest, and built-in plugins
turban May 19, 2026
51b9703
refactor: remove ingestion.function — ingestion.plugin is the only path
turban May 19, 2026
dfbc9af
fix: strip CF encoding attrs (add_offset, scale_factor) from WorldPop…
turban May 19, 2026
dcd0ca9
fix: detect actual data CRS for STAC proj:code and coverage spatial_w…
turban May 19, 2026
9c7ad60
fix: pass overwrite flag through _create_icechunk_artifact to upsert
turban May 19, 2026
a8f8d1c
fix: Icechunk zarr key serving and STAC zarr_format detection
turban May 19, 2026
bb2173c
fix: WorldPop nodata handling — mask -99999 to NaN during ingest
turban May 19, 2026
53789a9
chore: remove display.nodata from worldpop dataset
turban May 19, 2026
13e1309
Code cleaning
turban May 19, 2026
de92cb5
fix: resolve all mypy and pyright lint errors
turban May 19, 2026
254a02c
fix: treat empty icechunk skeleton as first write in orchestrator
turban May 19, 2026
d7969bd
fix: update tests and guard _detect_dataset_crs after encoding/nodata…
turban May 19, 2026
f5ae74d
fix: guard math.isnan call against None for pyright
turban May 19, 2026
f09c870
fix: address Copilot PR review — resource leaks, prefer_zarr, GridSpe…
turban May 19, 2026
9e7c85e
fix: strip CF attrs from zarr encoding in rechunk_store
turban May 19, 2026
0a129e1
fix: address second Copilot PR review — task leak, subscript, buffer,…
turban May 19, 2026
296e0f6
fix: address third Copilot PR review — cursor safety, overwrite, ds.c…
turban May 19, 2026
31bd121
fix: transforms, expire_snapshots timing, sync duplicates, and protoc…
turban May 19, 2026
469a073
feat: derive sync availability from plugin.periods() instead of lates…
turban May 19, 2026
1c27272
feat: remove chirps3/worldpop latest_available_function — plugin.peri…
turban May 19, 2026
159539b
refactor: remove legacy latest_available_function dispatch — plugin.p…
turban May 19, 2026
e3b6992
refactor: delete climate_api/providers package — no longer contains a…
turban May 19, 2026
36984d1
fix: address fourth Copilot PR review — task leak, GeoZarr attrs, Pro…
turban May 19, 2026
f569837
feat: multiscale pyramid support for Icechunk ingest
turban May 19, 2026
122f038
chore: remove dhis2eo dependency — never imported
turban May 19, 2026
37f8e74
refactor: remove legacy build_dataset_zarr and /build_zarr HTTP route
turban May 19, 2026
52ba79e
refactor: remove legacy netcdf/flat-zarr code paths after Icechunk mi…
turban May 19, 2026
c3ce2ef
docs: remove automatic reprojection references — plugin CRS is the st…
turban May 20, 2026
cd46ebd
fix: address Copilot review — pyramid append guard, WorldPop validati…
turban May 20, 2026
a9e842f
feat: POST /ingestions Prefer: respond-async + progress callbacks
turban May 20, 2026
85e6b19
fix: pyramid map rendering — strip sharding, serve root for zoom sele…
turban May 20, 2026
9d81dca
docs: update STAC pyramid asset href behaviour for Icechunk stores
turban May 20, 2026
731cd3a
docs: add Icechunk section to zarr_and_geozarr.md
turban May 20, 2026
9a61159
fix: handle bare group paths in Icechunk zarr serving; fix legacy rem…
turban May 20, 2026
5e76d6c
refactor: simplify examples — spatial dims are always x/y
turban May 20, 2026
f79c387
refactor: remove _upgrade_legacy_record
turban May 20, 2026
9ce61f0
refactor: remove legacy code — prefer_zarr, NETCDF format, artifact.p…
turban May 20, 2026
1cfc7a1
refactor: sync IngestionPlugin protocol, enumerate_periods utility, c…
turban May 20, 2026
ba36acd
fix: always read committed periods from store on resume; sync plan_sy…
turban May 20, 2026
39f2962
style: remove trailing whitespace from blank lines in test_datasets.py
turban May 20, 2026
6fd343a
style: ruff format
turban May 20, 2026
f30b292
fix: resolve mypy errors in store, sync_engine, and services
turban May 20, 2026
548cd6c
fix: resolve pyright errors — remove stale path= field, suppress side…
turban May 20, 2026
b911c47
fix: update FakePlugin/EmptyPlugin in tests to use sync periods()
turban May 20, 2026
c03fb2e
fix: correct async→sync docs; guard open_or_create_repo against missi…
turban May 20, 2026
981966a
fix: address review findings — remove load_cursor, fix ERA5 probe, cl…
turban May 20, 2026
eac91cf
fix: expose format as "zarr" in ZarrListing responses
turban May 20, 2026
f9c004a
feat: live progress tracking in manage UI (ingest and sync)
turban May 20, 2026
a7bc5b7
fix: remove extent field from ingest form (single-extent instances)
turban May 20, 2026
1e4a250
fix: add SSE headers and keepalive to unblock browser buffering
turban May 20, 2026
c03ca95
perf: cache ERA5-Land remote store on plugin instance
turban May 20, 2026
9d99dc0
fix: add python-multipart as explicit dependency
turban May 20, 2026
ad867d1
fix: expand scalar time coord before passing to xstac
turban May 20, 2026
ce1760d
fix: hourly end date normalizes to T23 when given a bare date
turban May 20, 2026
cdcc94a
fix: get_time_dim checks actual dims, not just coordinates
turban May 20, 2026
2067028
fix: detect WGS84 from coordinate units when spatial_ref is absent
turban May 20, 2026
cf5a443
fix: detect pyramid Icechunk stores for correct STAC zarr href
turban May 20, 2026
0e95ec4
fix: reduce CHIRPS3 max_concurrency to 1 to avoid rate-limit bans
turban May 20, 2026
464299f
fix: handle timeless datasets in coverage_from_open_dataset
turban May 20, 2026
fa44c57
fix: detect completed timeless stores in read_committed_period_ids
turban May 20, 2026
54048df
fix: allow null temporal coverage for static (timeless) datasets
turban May 20, 2026
99db6fc
fix: handle timeless datasets in STAC collection building
turban May 20, 2026
a830268
fix: skip xstac for timeless stores, build spatial cube:dimensions ma…
turban May 20, 2026
2640ccb
fix: guard _override_temporal_extent_from_artifact against None start…
turban May 20, 2026
258b067
fix: expose pyramid root URL to zarr-layer for multiscale selection
turban May 20, 2026
7c20b97
feat: palette LUT support for categorical datasets
turban May 20, 2026
3c53f45
Merge remote-tracking branch 'origin/main' into feat/icechunk-ingest
turban May 20, 2026
4c22d02
fix: resolve ruff lint errors after merge with main
turban May 20, 2026
6591129
fix: resolve all mypy type errors
turban May 20, 2026
f70d3e7
fix: initialise da=None before retry loop to satisfy pyright
turban May 20, 2026
eb2736e
docs: remove outdated single-CRS constraint from zarr_and_geozarr.md
turban May 20, 2026
2c56fbb
docs: move Store layout on disk section after Icechunk section
turban May 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The DHIS2 Climate API is a FastAPI-based REST API that downloads, processes, and

Key concepts:

- **Dataset templates** — YAML files in `data/datasets/` describing a data source (variable, period type, download function). These are blueprints.
- **Dataset templates** — YAML files in `data/datasets/` describing a data source (variable, period type, ingestion plugin). These are blueprints.
- **Artifacts / managed datasets** — ingested instances of a template for a specific spatial extent and time range. Exposed under `/datasets` and `/zarr/{dataset_id}`.
- **Extent** — a single named spatial bounding box configured at instance setup time (`id`, `bbox`, optional `country_code`). Exposed at `GET /extent`.
- **GeoZarr stores** — datasets are stored as chunked Zarr v3 archives with GeoZarr spatial attributes. Flat stores for small extents; multiscale pyramids for large ones. Served chunk-by-chunk over HTTP with no specialised server middleware.
Expand Down Expand Up @@ -44,18 +44,17 @@ The `.env` file is required for `make run` and `make openapi`. Copy `.env.exampl

## Dataset templates

Each YAML in `data/datasets/` defines a dataset template. The `ingestion` block controls download and zarr build behaviour:
Each YAML in `data/datasets/` defines a dataset template. The `ingestion` block specifies the plugin class that streams data directly into the Icechunk store:

```yaml
ingestion:
function: dhis2eo.data.worldpop.pop_total.yearly.download
default_params: {} # passed to the download function
plugin: climate_api.ingest.plugins.worldpop.WorldPopPlugin
params:
version: global2
```

`build_dataset_zarr` in `data_manager/downloader.py` builds a multiscale Zarr pyramid when the spatial dimensions exceed 2048×2048 pixels; otherwise it writes a flat chunked zarr with chunk sizes derived from the dataset's temporal resolution.

The ingestion interface is being redesigned as a plugin protocol (see GitHub issue #64) — the `ingestion.function` convention will be replaced by a three-method async plugin (`probe`, `periods`, `fetch_period`).

## pygeoapi

pygeoapi is mounted at `/ogcapi` as a sub-application. Its config is generated dynamically from published artifacts by `publications/services.py` and written to `data/pygeoapi/pygeoapi-config.yml`.
Expand Down
62 changes: 2 additions & 60 deletions climate_api/client.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
"""Lightweight client for discovering and opening published Climate API datasets."""

import os
from urllib.parse import urlparse

import httpx
import xarray as xr

_FALLBACK_BASE_URL = "http://127.0.0.1:8000"
_DEFAULT_TIMEOUT = 30.0


def _default_base_url() -> str:
return os.environ.get("CLIMATE_API_BASE_URL", _FALLBACK_BASE_URL)


def _id_from_href(href: str) -> str:
"""Extract the dataset id from a STAC child href by reading the last URL path segment."""
return urlparse(href).path.rstrip("/").rsplit("/", 1)[-1]
Expand Down Expand Up @@ -71,8 +65,8 @@ def open(self, dataset_id: str) -> xr.Dataset:
"""Open a published dataset as an xarray Dataset.

Fetches the STAC collection for ``dataset_id``, reads the Zarr asset
metadata, and returns the opened dataset. Coordinates are always
``time``, ``latitude``, and ``longitude``.
metadata, and returns the opened dataset. Spatial dimensions are always
named ``x`` and ``y``; the time dimension, when present, is ``time``.
"""
response = self._http.get(f"{self.base_url}/stac/collections/{dataset_id}")
response.raise_for_status()
Expand All @@ -92,55 +86,3 @@ def open(self, dataset_id: str) -> xr.Dataset:
if not isinstance(open_kwargs, dict):
raise ValueError(f"Zarr asset for '{dataset_id}' has a malformed xarray:open_kwargs field")
return xr.open_zarr(href, **open_kwargs) # type: ignore[no-any-return]


def list_datasets(base_url: str | None = None) -> list[dict]:
"""Return all published datasets from the STAC catalog.

Each entry is a STAC child link dict with at least ``id``, ``title``, and ``href``.
``base_url`` defaults to the ``CLIMATE_API_BASE_URL`` environment variable,
falling back to ``http://127.0.0.1:8000``.
"""
url = (base_url or _default_base_url()).rstrip("/")
response = httpx.get(f"{url}/stac/catalog.json", timeout=_DEFAULT_TIMEOUT)
response.raise_for_status()
catalog = response.json()
raw_links = catalog.get("links")
if not isinstance(raw_links, list):
raise ValueError(f"Invalid STAC catalog response from {url}: missing or non-list 'links' field")
links = []
for link in raw_links:
if isinstance(link, dict) and link.get("rel") == "child":
href = link.get("href")
if not isinstance(href, str) or not href:
raise ValueError(f"STAC child link from {url} has a missing or invalid href")
links.append({**link, "id": _id_from_href(href)})
return links


def open_dataset(dataset_id: str, *, base_url: str | None = None) -> xr.Dataset:
"""Open a published dataset as an xarray Dataset.

Fetches the STAC collection for ``dataset_id``, reads the Zarr asset
metadata, and returns the opened dataset. Coordinates are always
``time``, ``latitude``, and ``longitude``.
``base_url`` defaults to the ``CLIMATE_API_BASE_URL`` environment variable,
falling back to ``http://127.0.0.1:8000``.
"""
url = (base_url or _default_base_url()).rstrip("/")
response = httpx.get(f"{url}/stac/collections/{dataset_id}", timeout=_DEFAULT_TIMEOUT)
response.raise_for_status()
collection = response.json()
assets = collection.get("assets")
if not isinstance(assets, dict):
raise ValueError(f"STAC collection for '{dataset_id}' from {url} has a missing or invalid 'assets' field")
asset = assets.get("zarr")
if not isinstance(asset, dict):
raise ValueError(f"Dataset '{dataset_id}' has no Zarr asset in the STAC collection")
href = asset.get("href")
if not isinstance(href, str) or not href:
raise ValueError(f"Zarr asset for '{dataset_id}' has a missing or invalid href")
open_kwargs = asset.get("xarray:open_kwargs", {})
if not isinstance(open_kwargs, dict):
raise ValueError(f"Zarr asset for '{dataset_id}' has a malformed xarray:open_kwargs field")
return xr.open_zarr(href, **open_kwargs) # type: ignore[no-any-return]
7 changes: 4 additions & 3 deletions climate_api/data/datasets/chirps3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
sync:
kind: temporal
execution: append
availability:
latest_available_function: climate_api.providers.availability.chirps3_daily_latest_available
extents:
spatial:
bbox: [-180, -50, 180, 50]
temporal:
begin: "1981-01-01"
resolution: P1D
ingestion:
function: dhis2eo.data.chc.chirps3.daily.download
plugin: climate_api.ingest.plugins.chirps3.Chirps3Plugin
params:
stage: final
flavor: rnl
units: mm
resolution: 5 km x 5 km
source: CHIRPS v3
Expand Down
18 changes: 6 additions & 12 deletions climate_api/data/datasets/era5_land.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@
sync:
kind: temporal
execution: append
availability:
latest_available_function: climate_api.providers.availability.lagged_latest_available
lag_hours: 120
extents:
spatial:
bbox: [-180, -90, 180, 90]
temporal:
begin: "1950-01-01"
resolution: PT1H
ingestion:
function: dhis2eo.data.destine.era5_land.hourly.download
default_params:
variables: ['t2m']
plugin: climate_api.ingest.plugins.era5_land.Era5LandPlugin
params:
variable: t2m
transforms:
- climate_api.transforms.kelvin_to_celsius
units: degC
Expand All @@ -37,19 +34,16 @@
sync:
kind: temporal
execution: append
availability:
latest_available_function: climate_api.providers.availability.lagged_latest_available
lag_hours: 120
extents:
spatial:
bbox: [-180, -90, 180, 90]
temporal:
begin: "1950-01-01"
resolution: PT1H
ingestion:
function: dhis2eo.data.destine.era5_land.hourly.download
default_params:
variables: ['tp']
plugin: climate_api.ingest.plugins.era5_land.Era5LandPlugin
params:
variable: tp
transforms:
- climate_api.transforms.metres_to_mm
units: mm
Expand Down
10 changes: 3 additions & 7 deletions climate_api/data/datasets/worldpop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
period_type: yearly
sync:
kind: release
availability:
latest_available_function: climate_api.providers.availability.worldpop_release_latest_available
# WorldPop projections are intentionally request-driven for future years.
allow_future: true
extents:
spatial:
bbox: [-180, -90, 180, 90]
Expand All @@ -17,8 +13,9 @@
end: "2030"
resolution: P1Y
ingestion:
function: dhis2eo.data.worldpop.pop_total.yearly.download
default_params:
plugin: climate_api.ingest.plugins.worldpop.WorldPopPlugin
params:
# country_code is injected automatically from extent.country_code in climate-api.yaml
version: global2
units: people
resolution: 100m x 100m
Expand All @@ -27,4 +24,3 @@
display:
colormap: reds
range: [0.0, 25.0]
nodata: 0.0
81 changes: 44 additions & 37 deletions climate_api/data_accessor/services/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import logging
import os
import tempfile
from pathlib import Path
from typing import Any

import numpy as np
import xarray as xr
from pyproj import Transformer

from ...data_manager.services.downloader import get_cache_files, get_zarr_path
from ...data_manager.services.downloader import get_icechunk_path
from ...data_manager.services.utils import get_time_dim, get_x_y_dims
from ...shared.time import numpy_datetime_to_period_string

Expand All @@ -24,21 +25,8 @@ def get_data(
) -> xr.Dataset:
"""Load an xarray raster dataset for a given time range and bbox."""
logger.info("Opening dataset")
zarr_path = get_zarr_path(dataset)
if zarr_path:
logger.info(f"Using optimized zarr file: {zarr_path}")
ds = open_zarr_dataset(str(zarr_path))
else:
logger.warning(
f"Could not find optimized zarr file for dataset {dataset['id']}, using slower netcdf files instead."
)
files = get_cache_files(dataset)
ds = xr.open_mfdataset(
files,
data_vars="minimal",
coords="minimal", # pyright: ignore[reportArgumentType]
compat="override",
)
store_path = get_icechunk_path(dataset)
ds = open_icechunk_dataset(store_path)

if start and end:
logger.info(f"Subsetting time to {start} and {end}")
Expand Down Expand Up @@ -72,25 +60,10 @@ def get_data_coverage(dataset: dict[str, Any]) -> dict[str, Any]:
def get_data_coverage_for_paths(
dataset: dict[str, Any],
*,
zarr_path: str | None = None,
netcdf_paths: list[str] | None = None,
zarr_path: str,
) -> dict[str, Any]:
"""Return coverage metadata for the concrete files created for one artifact."""
if zarr_path is not None and netcdf_paths:
raise ValueError("Provide either zarr_path or netcdf_paths when computing coverage, not both")
if zarr_path is None and not netcdf_paths:
raise ValueError("Coverage calculation requires either zarr_path or at least one netcdf path")

if zarr_path is not None:
ds = open_zarr_dataset(zarr_path)
else:
assert netcdf_paths is not None
ds = xr.open_mfdataset(
netcdf_paths,
data_vars="minimal",
coords="minimal", # pyright: ignore[reportArgumentType]
compat="override",
)
"""Return coverage metadata for a materialized flat-zarr artifact."""
ds = open_zarr_dataset(zarr_path)

from climate_api import config as api_config

Expand Down Expand Up @@ -124,11 +97,40 @@ def open_zarr_dataset(zarr_path: str) -> xr.Dataset:
return ds


def open_icechunk_dataset(store_path: str | Path) -> xr.Dataset:
"""Open an Icechunk store as an xarray Dataset via a readonly MVCC session.

Detects multiscale pyramid stores (root group has ``multiscales`` in attrs)
and opens group ``0`` (full resolution) in that case.
"""
import icechunk
import zarr

path = Path(store_path)
if not path.exists():
raise FileNotFoundError(f"Icechunk store not found: {path}")
storage = icechunk.local_filesystem_storage(str(path))
repo = icechunk.Repository.open(storage)
session = repo.readonly_session("main")
root = zarr.open_group(session.store, mode="r")
group: str | None = "0" if "multiscales" in root.attrs else None
return xr.open_zarr(session.store, group=group) # type: ignore[no-any-return]


def _open_zarr(zarr_path: str) -> xr.Dataset:
"""Open a zarr store with automatic consolidated metadata detection."""
return xr.open_zarr(zarr_path, consolidated=None) # type: ignore[no-any-return]


def coverage_from_open_dataset(ds: xr.Dataset, *, period_type: str, native_crs: str = "EPSG:4326") -> dict[str, Any]:
"""Summarize temporal and spatial coverage for a caller-managed open dataset.

Unlike get_data_coverage_for_paths, this function does not close the dataset.
Use when the caller already holds a store handle (e.g. an Icechunk session store).
"""
return _coverage_from_dataset(ds=ds, period_type=period_type, native_crs=native_crs)


def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str, native_crs: str = "EPSG:4326") -> dict[str, Any]:
"""Summarize temporal and spatial coverage for an already opened dataset."""
if any(size == 0 for size in ds.sizes.values()):
Expand All @@ -141,11 +143,16 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str, native_crs: str
},
}

time_dim = get_time_dim(ds)
x_dim, y_dim = get_x_y_dims(ds)

start = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].min(), period_type)) # type: ignore[arg-type]
end = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].max(), period_type)) # type: ignore[arg-type]
start: str | None
end: str | None
try:
time_dim = get_time_dim(ds)
start = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].min(), period_type)) # type: ignore[arg-type]
end = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].max(), period_type)) # type: ignore[arg-type]
except ValueError:
start = end = None

xmin, xmax = ds[x_dim].min().item(), ds[x_dim].max().item()
ymin, ymax = ds[y_dim].min().item(), ds[y_dim].max().item()
Expand Down
1 change: 0 additions & 1 deletion climate_api/data_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""Data manager package."""

from . import routes as routes
from . import services as services
Loading
Loading