Skip to content

feat: ResampledPlugin wrapper — store high-frequency source data at lower frequency during ingest #145

@turban

Description

@turban

Problem

ERA5-Land and other sources publish data at hourly (or sub-daily) frequency, but most downstream use cases only need daily aggregates. Storing every hour wastes ~24× the space and makes the Icechunk store slower to query.

There is no way today to express "fetch hourly, write daily" in the ingest pipeline without writing a custom plugin.

Proposed solution

Add a generic ResampledPlugin wrapper in climate_api/ingest/plugins/resample.py.

The wrapper implements the IngestionPlugin protocol and delegates to any inner plugin, but changes the period granularity at the boundary between fetch and write:

class ResampledPlugin:
    def __init__(self, inner: IngestionPlugin, frequency: str, method: str) -> None: ...

    async def probe(self, bbox, **params) -> GridSpec: ...      # delegates, patches time_dim

    async def periods(self, start, end) -> list[str]:
        # Convert inner source periods → unique output-period IDs
        # e.g. 24 hourly IDs → 1 daily ID "YYYY-MM-DD"

    async def fetch_period(self, period_id, bbox, **params) -> xr.Dataset:
        # Derive all source sub-periods for this output period
        # Fetch them concurrently (respecting inner.max_concurrency)
        # Concatenate + resample using _resample_dataset() from processing/resample.py

The orchestrator is unchanged — it sees only daily (or weekly, monthly…) periods and writes one commit per output period.

YAML config

An optional resample block under ingestion activates the wrapper:

# era5_land.yaml
- id: era5land_temperature_daily
  period_type: daily
  ingestion:
    plugin: climate_api.ingest.plugins.era5_land.Era5LandPlugin
    params:
      variable: t2m
    resample:
      frequency: daily   # "daily" | "weekly" | "monthly"
      method: mean       # xarray resample method: mean, sum, min, max, …

_create_icechunk_artifact in ingestions/services.py reads ingestion.resample, instantiates the inner plugin normally, then wraps it with ResampledPlugin before passing it to run_ingest_sync.

Reuse

_resample_dataset() in climate_api/processing/resample.py already handles the xarray .resample().{method}() logic including edge-period trimming. ResampledPlugin.fetch_period can call it directly after concatenating the inner datasets.

Out of scope

  • Changing the existing era5land_temperature_hourly dataset definition (keep it for users who need raw hourly data)
  • Adding a new era5land_temperature_daily dataset YAML entry — straightforward once the wrapper exists

Files to create / modify

File Change
climate_api/ingest/plugins/resample.py New — ResampledPlugin
climate_api/ingestions/services.py Read ingestion.resample, wrap plugin
climate_api/data/datasets/era5_land.yaml Add era5land_temperature_daily entry
tests/test_ingest_plugins.py Unit tests for ResampledPlugin

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions