Skip to content

Refactor compression pipeline: shared zarr LocalStore, hybrid MPI+threads, in-memory evaluation#4

Open
kotsaloscv wants to merge 14 commits into
mainfrom
hpc_refactoring
Open

Refactor compression pipeline: shared zarr LocalStore, hybrid MPI+threads, in-memory evaluation#4
kotsaloscv wants to merge 14 commits into
mainfrom
hpc_refactoring

Conversation

@kotsaloscv
Copy link
Copy Markdown
Contributor

Summary

End-to-end refactor of the evaluate_combos / compress_with_optimal / merge_compressed_fields pipeline. The sweep is now fully in-memory, within-node parallelism moves from MPI ranks to a ThreadPoolExecutor, and compressed fields land directly in a shared {dataset}.zarr LocalStore instead of per-field .zarr.zip files that had to be unzipped and re-merged.

What changed

Storage layout

  • .zarr.zip is gone. compress_with_optimal writes each field directly into a shared {where_to_write}/{dataset}.zarr LocalStore (component = field name). merge_compressed_fields no longer unzips/copies/rezips — it just runs zarr.consolidate_metadata.
  • Commands renamed: open_zarr_zip_file_and_inspectopen_zarr_and_inspect, from_zarr_zip_to_netcdffrom_zarr_to_netcdf. Both now take a .zarr directory path.
  • utils.open_dataset drops .zarr.zip support; .nc, .grib, .zarr remain.

Parallelism (hybrid MPI + threads)

  • evaluate_combos now requires 1 MPI rank per node; within-node parallelism is a ThreadPoolExecutor. Multi-rank-per-node launches are rejected with a clear launch-string hint.
  • New helpers: detect_node_topology (MPI-3 Split_type, hostname fallback), detect_cores_available (cgroup/Slurm-aware via sched_getaffinity), compute_default_threads_per_rank.
  • New --threads-per-rank flag (auto-detected if omitted).
  • --oversubscription-check warns/aborts if OMP_NUM_THREADS / MKL_NUM_THREADS / OPENBLAS_NUM_THREADS / BLOSC_NTHREADS / NUMBA_NUM_THREADS aren't pinned to 1. Zarr v3's internal thread pool is also pinned.
  • Thread-safe progress_bar and Timer (locks around shared counters/dict).
  • All sys.exit(1) paths that could hang siblings at the next collective are now comm.Abort(1).

Evaluation pipeline

  • evaluate_codec_pipeline runs entirely against a MemoryStore — no disk I/O per combo, no zip wrapping. Error norms are accumulated chunk-wise so a full decompressed copy of the sample is never held in memory.
  • Dask is forced to the synchronous scheduler inside each thread (scoped via with dask.config.set) to prevent nested pools.
  • Per-combo failures are collected rather than fatal; counts are reduced across ranks and reported on rank 0.

Persistence path (with sharding)

  • New persist_with_codec_pipeline writes via dask.array.to_zarr with inner chunks + shards; Dask chunks are rechunked to shard shape so each write = one shard.
  • compress_with_optimal gains --inner-chunk-mib (default 16), --shard-mib (default 512), --threads, and --verify/--no-verify (on by default; disable to skip the re-read pass for trusted combos).

Representative sampling

  • New build_representative_sample: stride-samples along the leading dim via np.linspace, keeping trailing spatial dims full. Deterministic so evaluate_combos and compress_with_optimal build identical codec spaces.
  • New --eval-data-size-limit flag (default "5GB") with parse_size helper for GB/GiB/MiB/... strings. Must match between evaluate_combos and compress_with_optimal so the codec-space indices resolve to the same objects.
  • Old --field-percentage-to-compress is removed.

MPI

  • New broadcast_numpy uses Bcast (uppercase, buffer-protocol) with shape/dtype metadata piggybacked over pickle. Lifts the payload ceiling from ~2 GB (the old [buf, MPI.BYTE] path would silently cap / corrupt at the 5 GB default sample budget) to ~16 GB for float64.

Results / audit trail

  • Per-rank streaming CSV config_space_{var}_rank{N}.csv (flushed per row; survives mid-sweep crashes), consolidated into results_{var}.parquet on rank 0. Both passing and filtered-out combos are recorded, distinguished by a keep column.
  • Per-variable filenames use var (not field_to_compress or "all") so multi-field runs no longer collide.
  • analyze_clustering now takes required --where-to-write and --var flags instead of silently reading config_space.csv from cwd.

Imports / deps

  • Heavyweight optional deps (matplotlib, sklearn, plotly, tqdm) are now imported lazily inside perform_clustering / analyze_clustering. evaluate_combos and compress_with_optimal no longer pay their import cost.
  • pyproject.toml: dropped strict pins on numpy, dask, zarr, numcodecs.
  • Dockerfile: removed the pip install --force-reinstall "dask[...]" "numpy==..." line that conflicted with the loosened pins.
  • .gitignore: added *.parquet.

Breaking changes

  • .zarr.zip output and readers are removed.
  • Command renames: open_zarr_zip_file_and_inspectopen_zarr_and_inspect, from_zarr_zip_to_netcdffrom_zarr_to_netcdf.
  • evaluate_combos: where_to_write is now the flag --where-to-write (required); --field-percentage-to-compress removed; --eval-data-size-limit added.
  • compress_with_optimal: new required topology of same --eval-data-size-limit as the sweep; new --inner-chunk-mib / --shard-mib / --threads / --verify flags.
  • analyze_clustering: --where-to-write and --var are now required.
  • evaluate_combos now requires 1 MPI rank per node; relaunch with mpirun -n <NODES> --ntasks-per-node=1 ... (or srun --nodes=<N> --ntasks-per-node=1 ...).

Migration

# Old
mpirun -n 32 dc_toolkit evaluate_combos input.nc /out --field-percentage-to-compress 10
# New
mpirun -n <NODES> --ntasks-per-node=1 \
  dc_toolkit evaluate_combos input.nc \
    --where-to-write /out \
    --eval-data-size-limit 5GB

Pass the same --eval-data-size-limit to compress_with_optimal, otherwise the (comp_idx, filt_idx, ser_idx) returned by the sweep may resolve to codec objects with slightly different parameters (symptom: worse compression ratio than the sweep reported, no error).

@kotsaloscv kotsaloscv requested a review from nfarabullini April 23, 2026 09:16
Comment thread santis.run Outdated
Comment thread santis.run Outdated
Comment thread src/dc_toolkit/cli.py
Comment thread README.md
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants