From 40dd9032c76918e25f3d4c5f525b6268dd1075c2 Mon Sep 17 00:00:00 2001 From: "warre.veys" Date: Wed, 6 May 2026 11:40:53 +0200 Subject: [PATCH 1/7] feat: optional save_rankings artifact for ranking tasks Add a `save_rankings: bool = False` flag to `workrb.evaluate()` that persists per-target ranking score arrays for each ranking-task dataset under `/rankings//__.json`. Each artifact also records `model_name` in its payload so files remain self-describing if moved. To enable this without recomputing the prediction matrix, `RankingTask.evaluate` is split into `compute_prediction_matrix` + `compute_metrics_from_prediction_matrix`; default behavior is unchanged. --- README.md | 15 +++ src/workrb/config.py | 42 ++++++ src/workrb/run.py | 47 ++++++- src/workrb/tasks/abstract/ranking_base.py | 47 ++++--- tests/test_save_rankings_artifacts.py | 154 ++++++++++++++++++++++ 5 files changed, 285 insertions(+), 20 deletions(-) create mode 100644 tests/test_save_rankings_artifacts.py diff --git a/README.md b/README.md index 5deeb16..41e4df4 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ results = workrb.evaluate( # Returns BenchmarkResults (Pydantic model) model, tasks, output_folder="results/my_model", + save_rankings=False, # Optional: store full per-target score arrays for ranking tasks ) print(results) # Benchmark/Per-task/Per-language metrics ``` @@ -208,6 +209,20 @@ results/my_model/ └── config.yaml # Final benchmark configuration dump ``` +If you pass `save_rankings=True` to `evaluate`, WorkRB also writes per-task, +per-dataset ranking score artifacts under a model-scoped subdirectory: + +``` +results/my_model/ +└── rankings/ + └── / + └── __.json +``` + +Each JSON contains `model_name`, `task_name`, `dataset_id`, `num_queries`, +`num_targets`, and `scores_by_target` (a mapping from target text to its +score across all queries). + To load & parse results from a run: ```python diff --git a/src/workrb/config.py b/src/workrb/config.py index 14daf6a..4d6df53 100644 --- a/src/workrb/config.py +++ b/src/workrb/config.py @@ -7,6 +7,7 @@ import json import logging +import re import time from collections.abc import Sequence from dataclasses import asdict, dataclass, field @@ -134,6 +135,47 @@ def get_results_path(self) -> Path: """Get the path where final results should be saved.""" return self.get_output_path() / "results.json" + def get_rankings_dir(self) -> Path: + """Get the directory where per-dataset ranking artifacts are saved. + + Rankings are nested under a sanitized model-name directory so that + running multiple models into the same ``output_folder`` cannot clobber + each other's ranking files. + """ + safe_model_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", self.model_name).strip("_") + return self.get_output_path() / "rankings" / safe_model_name + + def get_task_rankings_path(self, task_name: str, dataset_id: str) -> Path: + """Get the output path for one task/dataset ranking artifact.""" + safe_task_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", task_name).strip("_") + safe_dataset_id = re.sub(r"[^A-Za-z0-9_.-]+", "_", dataset_id).strip("_") + filename = f"{safe_task_name}__{safe_dataset_id}.json" + return self.get_rankings_dir() / filename + + def save_rankings_artifact( + self, + task_name: str, + dataset_id: str, + scores_by_target: dict[str, list[float]], + num_queries: int, + num_targets: int, + ) -> Path: + """Save full ranking scores for one dataset as a JSON artifact.""" + rankings_path = self.get_task_rankings_path(task_name=task_name, dataset_id=dataset_id) + rankings_path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "model_name": self.model_name, + "task_name": task_name, + "dataset_id": dataset_id, + "num_queries": num_queries, + "num_targets": num_targets, + "scores_by_target": scores_by_target, + } + with open(rankings_path, "w") as f: + json.dump(payload, f, indent=2) + logger.debug(f"Ranking artifact saved to {rankings_path}") + return rankings_path + def has_checkpoint(self) -> bool: """Check if a checkpoint exists.""" return self.get_checkpoint_path().exists() diff --git a/src/workrb/run.py b/src/workrb/run.py index aefdeb1..7ca99b1 100644 --- a/src/workrb/run.py +++ b/src/workrb/run.py @@ -23,6 +23,7 @@ TaskResults, ) from workrb.tasks.abstract.base import Task +from workrb.tasks.abstract.ranking_base import RankingTask from workrb.types import ExecutionMode, LanguageAggregationMode, get_language_grouping_key logger = logging.getLogger(__name__) @@ -36,6 +37,7 @@ def evaluate( metrics: dict[str, list[str]] | None = None, description: str = "", force_restart: bool = False, + save_rankings: bool = False, language_aggregation_mode: LanguageAggregationMode = LanguageAggregationMode.MONOLINGUAL_ONLY, execution_mode: ExecutionMode = ExecutionMode.LAZY, ) -> BenchmarkResults: @@ -49,6 +51,10 @@ def evaluate( metrics: Optional dict mapping task names to custom metrics lists description: Description for the benchmark run force_restart: If True, ignore checkpoints and restart from beginning + save_rankings: If True, save per-target ranking score arrays for each + ranking task dataset under + ``/rankings//`` as JSON artifacts. + Has no effect for non-ranking tasks. Defaults to False. language_aggregation_mode: How per-language results should be grouped when calling ``get_summary_metrics()`` on the returned results. When ``execution_mode`` is ``LAZY``, datasets that are @@ -111,6 +117,7 @@ def evaluate( results=results, model=model, metrics=metrics, + save_rankings=save_rankings, total_evaluations=total_evaluations, ) if results.metadata.resumed_from_checkpoint: @@ -429,6 +436,7 @@ def _run_pending_work( results: BenchmarkResults, model: ModelInterface, metrics: dict[str, list[str]] | None, + save_rankings: bool, total_evaluations: int, ) -> BenchmarkResults: """Run pending evaluations. @@ -439,6 +447,7 @@ def _run_pending_work( results: BenchmarkResults object to store results. model: ModelInterface object to evaluate. metrics: Dictionary of task names to their custom metrics. + save_rankings: If True, save full ranking score artifacts for ranking tasks. total_evaluations: Total number of compatible evaluations (for progress display). """ # Run pending evaluations @@ -476,9 +485,30 @@ def _run_pending_work( try: start_time_eval = time.time() - dataset_results: dict[str, float] = task.evaluate( - model=model, metrics=task_metrics, dataset_id=dataset_id - ) + if save_rankings and isinstance(task, RankingTask): + prediction_matrix = task.compute_prediction_matrix( + model=model, dataset_id=dataset_id + ) + dataset_results = task.compute_metrics_from_prediction_matrix( + prediction_matrix=prediction_matrix, + dataset_id=dataset_id, + metrics=task_metrics, + ) + rankings_path = config.save_rankings_artifact( + task_name=task.name, + dataset_id=dataset_id, + scores_by_target=_build_scores_by_target( + target_space=task.datasets[dataset_id].target_space, + prediction_matrix=prediction_matrix, + ), + num_queries=prediction_matrix.shape[0], + num_targets=prediction_matrix.shape[1], + ) + logger.info(f"\tSaved ranking scores to: {rankings_path}") + else: + dataset_results: dict[str, float] = task.evaluate( + model=model, metrics=task_metrics, dataset_id=dataset_id + ) evaluation_time = time.time() - start_time_eval # Store results @@ -508,3 +538,14 @@ def _run_pending_work( logger.info(f"Completed {run_idx} / {total_evaluations} evaluations. ") return results + + +def _build_scores_by_target( + target_space: list[str], + prediction_matrix: Any, +) -> dict[str, list[float]]: + """Build a mapping from target text to its scores across all queries.""" + return { + target_text: prediction_matrix[:, idx].tolist() + for idx, target_text in enumerate(target_space) + } diff --git a/src/workrb/tasks/abstract/ranking_base.py b/src/workrb/tasks/abstract/ranking_base.py index ed99200..efc88ba 100644 --- a/src/workrb/tasks/abstract/ranking_base.py +++ b/src/workrb/tasks/abstract/ranking_base.py @@ -8,6 +8,7 @@ from enum import Enum from typing import TYPE_CHECKING +import numpy as np import torch from workrb.metrics.ranking import calculate_ranking_metrics @@ -306,30 +307,42 @@ def evaluate( dict[str, float] Dictionary containing metric scores and evaluation metadata. """ - if metrics is None: - metrics = self.default_metrics + prediction_matrix = self.compute_prediction_matrix(model=model, dataset_id=dataset_id) + return self.compute_metrics_from_prediction_matrix( + prediction_matrix=prediction_matrix, + dataset_id=dataset_id, + metrics=metrics, + ) - # Retrieve dataset by ID + def compute_prediction_matrix( + self, + model: ModelInterface, + dataset_id: str = "en", + ) -> np.ndarray: + """Compute the ranking score matrix for a dataset.""" dataset = self.datasets[dataset_id] - queries = dataset.query_texts - targets = dataset.target_space - labels = dataset.target_indices - - # Get model predictions (similarity matrix) prediction_matrix = model.compute_rankings( - queries=queries, - targets=targets, + queries=dataset.query_texts, + targets=dataset.target_space, query_input_type=self.query_input_type, target_input_type=self.target_input_type, ) - - # Convert to numpy if needed if isinstance(prediction_matrix, torch.Tensor): prediction_matrix = prediction_matrix.cpu().float().numpy() + return prediction_matrix - # Calculate metrics - metric_results = calculate_ranking_metrics( - prediction_matrix=prediction_matrix, pos_label_idxs=labels, metrics=metrics + def compute_metrics_from_prediction_matrix( + self, + prediction_matrix: np.ndarray, + dataset_id: str = "en", + metrics: list[str] | None = None, + ) -> dict[str, float]: + """Compute ranking metrics from a precomputed prediction matrix.""" + if metrics is None: + metrics = self.default_metrics + dataset = self.datasets[dataset_id] + return calculate_ranking_metrics( + prediction_matrix=prediction_matrix, + pos_label_idxs=dataset.target_indices, + metrics=metrics, ) - - return metric_results diff --git a/tests/test_save_rankings_artifacts.py b/tests/test_save_rankings_artifacts.py new file mode 100644 index 0000000..2d0e104 --- /dev/null +++ b/tests/test_save_rankings_artifacts.py @@ -0,0 +1,154 @@ +"""Tests for ranking artifact persistence in evaluate(save_rankings=...).""" + +import json +import shutil +from pathlib import Path + +import pytest +import torch + +import workrb +from workrb.models.base import ModelInterface +from workrb.tasks.abstract.base import DatasetSplit, LabelType, Language +from workrb.tasks.abstract.ranking_base import RankingDataset, RankingTask, RankingTaskGroup +from workrb.types import ModelInputType + + +class TinyRankingTask(RankingTask): + """Minimal ranking task used to test ranking artifact persistence.""" + + @property + def name(self) -> str: + return "Tiny Ranking Task" + + @property + def description(self) -> str: + return "Tiny in-memory ranking task for tests." + + @property + def supported_query_languages(self) -> list[Language]: + return [Language.EN] + + @property + def supported_target_languages(self) -> list[Language]: + return [Language.EN] + + @property + def task_group(self) -> RankingTaskGroup: + return RankingTaskGroup.SKILL_EXTRACTION + + @property + def label_type(self) -> LabelType: + return LabelType.MULTI_LABEL + + @property + def query_input_type(self) -> ModelInputType: + return ModelInputType.SKILL_SENTENCE + + @property + def target_input_type(self) -> ModelInputType: + return ModelInputType.SKILL_NAME + + def load_dataset(self, dataset_id: str, split: DatasetSplit) -> RankingDataset: + return RankingDataset( + query_texts=["query one", "query two"], + target_indices=[[0], [1]], + target_space=["target_a", "target_b", "target_c"], + dataset_id=dataset_id, + ) + + +class TinyDeterministicModel(ModelInterface): + """Deterministic model with fixed ranking scores for tests.""" + + @property + def name(self) -> str: + return "tiny-deterministic-model" + + @property + def description(self) -> str: + return "Tiny deterministic model for ranking artifact tests." + + def _compute_rankings( + self, + queries: list[str], + targets: list[str], + query_input_type: ModelInputType, + target_input_type: ModelInputType, + ) -> torch.Tensor: + # 2 queries x 3 targets + return torch.tensor( + [ + [0.1, 0.2, 0.3], + [0.4, 0.5, 0.6], + ], + dtype=torch.float32, + ) + + def _compute_classification( + self, + texts: list[str], + targets: list[str], + input_type: ModelInputType, + target_input_type: ModelInputType | None = None, + ) -> torch.Tensor: + return torch.zeros((len(texts), len(targets))) + + @property + def classification_label_space(self) -> list[str] | None: + return None + + +def test_evaluate_saves_rankings_artifact_when_enabled(): + output_folder = Path("tmp/rankings_artifact_test_enabled") + if output_folder.exists(): + shutil.rmtree(output_folder, ignore_errors=True) + + model = TinyDeterministicModel() + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + + _ = workrb.evaluate( + model=model, + tasks=tasks, + output_folder=str(output_folder), + force_restart=True, + save_rankings=True, + ) + + rankings_dir = output_folder / "rankings" / model.name + ranking_files = list(rankings_dir.glob("*.json")) + assert len(ranking_files) == 1 + + with open(ranking_files[0]) as f: + payload = json.load(f) + + assert payload["model_name"] == model.name + assert payload["task_name"] == "Tiny Ranking Task" + assert payload["dataset_id"] == "en" + assert payload["num_queries"] == 2 + assert payload["num_targets"] == 3 + + scores_by_target = payload["scores_by_target"] + assert set(scores_by_target.keys()) == {"target_a", "target_b", "target_c"} + assert scores_by_target["target_a"] == pytest.approx([0.1, 0.4]) + assert scores_by_target["target_b"] == pytest.approx([0.2, 0.5]) + assert scores_by_target["target_c"] == pytest.approx([0.3, 0.6]) + + +def test_evaluate_does_not_save_rankings_artifact_by_default(): + output_folder = Path("tmp/rankings_artifact_test_disabled") + if output_folder.exists(): + shutil.rmtree(output_folder, ignore_errors=True) + + model = TinyDeterministicModel() + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + + _ = workrb.evaluate( + model=model, + tasks=tasks, + output_folder=str(output_folder), + force_restart=True, + ) + + rankings_dir = output_folder / "rankings" + assert not rankings_dir.exists() From c603bafc27fe0dc2fed5ec194d1220b094c7301b Mon Sep 17 00:00:00 2001 From: "warre.veys" Date: Wed, 6 May 2026 12:03:48 +0200 Subject: [PATCH 2/7] ruff fixes --- tests/test_save_rankings_artifacts.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_save_rankings_artifacts.py b/tests/test_save_rankings_artifacts.py index 2d0e104..74c1fb0 100644 --- a/tests/test_save_rankings_artifacts.py +++ b/tests/test_save_rankings_artifacts.py @@ -100,6 +100,7 @@ def classification_label_space(self) -> list[str] | None: def test_evaluate_saves_rankings_artifact_when_enabled(): + """evaluate(save_rankings=True) writes one JSON artifact per ranking dataset with full per-target scores.""" output_folder = Path("tmp/rankings_artifact_test_enabled") if output_folder.exists(): shutil.rmtree(output_folder, ignore_errors=True) @@ -136,6 +137,7 @@ def test_evaluate_saves_rankings_artifact_when_enabled(): def test_evaluate_does_not_save_rankings_artifact_by_default(): + """evaluate() without save_rankings does not create a rankings/ directory.""" output_folder = Path("tmp/rankings_artifact_test_disabled") if output_folder.exists(): shutil.rmtree(output_folder, ignore_errors=True) From 4dd268162a3d72b91a18206f9d4a1338fb6a84af Mon Sep 17 00:00:00 2001 From: "warre.veys" Date: Sat, 16 May 2026 12:50:54 +0200 Subject: [PATCH 3/7] 1 file per run --- src/workrb/config.py | 27 +++++++++++++++++-------- src/workrb/run.py | 29 ++++++++++++++++++--------- tests/test_save_rankings_artifacts.py | 29 +++++++++++++++++---------- 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/src/workrb/config.py b/src/workrb/config.py index 4d6df53..8464fbd 100644 --- a/src/workrb/config.py +++ b/src/workrb/config.py @@ -156,20 +156,31 @@ def save_rankings_artifact( self, task_name: str, dataset_id: str, - scores_by_target: dict[str, list[float]], + scores: dict[str, dict[str, float]], num_queries: int, num_targets: int, ) -> Path: - """Save full ranking scores for one dataset as a JSON artifact.""" + """Save ranking scores for one dataset as a JSON artifact. + + The on-disk shape mirrors the ground-truth schema: + ``{model_id: {task_id: {language: {num_queries, num_targets, scores}}}}`` + where ``scores`` is ``{query_text: {target_text: score}}``. One file is + written per ``(task, dataset_id)`` so individual datasets can be loaded + without parsing the whole benchmark; consumers wanting a unified view + can merge files by deep-updating their nested dicts. + """ rankings_path = self.get_task_rankings_path(task_name=task_name, dataset_id=dataset_id) rankings_path.parent.mkdir(parents=True, exist_ok=True) payload = { - "model_name": self.model_name, - "task_name": task_name, - "dataset_id": dataset_id, - "num_queries": num_queries, - "num_targets": num_targets, - "scores_by_target": scores_by_target, + self.model_name: { + task_name: { + dataset_id: { + "num_queries": num_queries, + "num_targets": num_targets, + "scores": scores, + } + } + } } with open(rankings_path, "w") as f: json.dump(payload, f, indent=2) diff --git a/src/workrb/run.py b/src/workrb/run.py index 7ca99b1..5eb2f91 100644 --- a/src/workrb/run.py +++ b/src/workrb/run.py @@ -494,11 +494,13 @@ def _run_pending_work( dataset_id=dataset_id, metrics=task_metrics, ) + dataset = task.datasets[dataset_id] rankings_path = config.save_rankings_artifact( task_name=task.name, dataset_id=dataset_id, - scores_by_target=_build_scores_by_target( - target_space=task.datasets[dataset_id].target_space, + scores=_build_scores( + query_texts=dataset.query_texts, + target_space=dataset.target_space, prediction_matrix=prediction_matrix, ), num_queries=prediction_matrix.shape[0], @@ -540,12 +542,21 @@ def _run_pending_work( return results -def _build_scores_by_target( +def _build_scores( + query_texts: list[str], target_space: list[str], prediction_matrix: Any, -) -> dict[str, list[float]]: - """Build a mapping from target text to its scores across all queries.""" - return { - target_text: prediction_matrix[:, idx].tolist() - for idx, target_text in enumerate(target_space) - } +) -> dict[str, dict[str, float]]: + """Build nested ``{query_text: {target_text: score}}`` from a prediction matrix. + + Zero scores are omitted so sparse models (e.g. BM25) stay compact; + consumers should treat a missing target as a score of 0. + """ + matrix = prediction_matrix.tolist() + scores: dict[str, dict[str, float]] = {} + for q_idx, query_text in enumerate(query_texts): + row = matrix[q_idx] + scores[query_text] = { + target_space[t_idx]: score for t_idx, score in enumerate(row) if score != 0 + } + return scores diff --git a/tests/test_save_rankings_artifacts.py b/tests/test_save_rankings_artifacts.py index 74c1fb0..afca551 100644 --- a/tests/test_save_rankings_artifacts.py +++ b/tests/test_save_rankings_artifacts.py @@ -123,17 +123,24 @@ def test_evaluate_saves_rankings_artifact_when_enabled(): with open(ranking_files[0]) as f: payload = json.load(f) - assert payload["model_name"] == model.name - assert payload["task_name"] == "Tiny Ranking Task" - assert payload["dataset_id"] == "en" - assert payload["num_queries"] == 2 - assert payload["num_targets"] == 3 - - scores_by_target = payload["scores_by_target"] - assert set(scores_by_target.keys()) == {"target_a", "target_b", "target_c"} - assert scores_by_target["target_a"] == pytest.approx([0.1, 0.4]) - assert scores_by_target["target_b"] == pytest.approx([0.2, 0.5]) - assert scores_by_target["target_c"] == pytest.approx([0.3, 0.6]) + assert set(payload.keys()) == {model.name} + task_payload = payload[model.name] + assert set(task_payload.keys()) == {"Tiny Ranking Task"} + dataset_payload = task_payload["Tiny Ranking Task"] + assert set(dataset_payload.keys()) == {"en"} + + leaf = dataset_payload["en"] + assert leaf["num_queries"] == 2 + assert leaf["num_targets"] == 3 + + scores = leaf["scores"] + assert set(scores.keys()) == {"query one", "query two"} + assert scores["query one"]["target_a"] == pytest.approx(0.1) + assert scores["query one"]["target_b"] == pytest.approx(0.2) + assert scores["query one"]["target_c"] == pytest.approx(0.3) + assert scores["query two"]["target_a"] == pytest.approx(0.4) + assert scores["query two"]["target_b"] == pytest.approx(0.5) + assert scores["query two"]["target_c"] == pytest.approx(0.6) def test_evaluate_does_not_save_rankings_artifact_by_default(): From de9af5d99bea5c210d9f1cc4db32ed04e3c4155a Mon Sep 17 00:00:00 2001 From: "warre.veys" Date: Fri, 22 May 2026 08:49:49 +0200 Subject: [PATCH 4/7] Standardize the save + implementation of a reading counterpart --- CONTRIBUTING.md | 1 + src/workrb/__init__.py | 11 +- src/workrb/config.py | 96 ++++++--- src/workrb/rankings.py | 195 +++++++++++++++++ src/workrb/run.py | 290 ++++++++++++++++++++++---- tests/test_save_rankings_artifacts.py | 265 +++++++++++++++++++---- 6 files changed, 751 insertions(+), 107 deletions(-) create mode 100644 src/workrb/rankings.py diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 595b7e6..2b3e23b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -113,6 +113,7 @@ Make a pull request (PR) from your fork into the main branch of WorkRB, followin - [ ] Code follows project style guidelines - [ ] Documentation updated - [ ] No new warnings introduced + - [ ] If the rankings artifact schema changed: bumped SCHEMA_VERSION in workrb/rankings.py and updated SUPPORTED_SCHEMA_VERSIONS ``` ### 4. Review Process diff --git a/src/workrb/__init__.py b/src/workrb/__init__.py index 026ce95..c1b85ff 100644 --- a/src/workrb/__init__.py +++ b/src/workrb/__init__.py @@ -4,9 +4,15 @@ from workrb import data, metrics, models, tasks from workrb.logging import setup_logger +from workrb.rankings import RankingsArtifactInvalid, RankingsArtifactMissing from workrb.registry import list_available_tasks from workrb.results import load_results -from workrb.run import evaluate, evaluate_multiple_models, get_tasks_overview +from workrb.run import ( + evaluate, + evaluate_multiple_models, + evaluate_rankings, + get_tasks_overview, +) from workrb.types import ExecutionMode, LanguageAggregationMode # Configure 'workrb' logger to INFO level by default, by usage of package @@ -15,9 +21,12 @@ __all__ = [ "ExecutionMode", "LanguageAggregationMode", + "RankingsArtifactInvalid", + "RankingsArtifactMissing", "data", "evaluate", "evaluate_multiple_models", + "evaluate_rankings", "get_tasks_overview", "list_available_tasks", "load_results", diff --git a/src/workrb/config.py b/src/workrb/config.py index 8464fbd..27887c9 100644 --- a/src/workrb/config.py +++ b/src/workrb/config.py @@ -12,17 +12,38 @@ from collections.abc import Sequence from dataclasses import asdict, dataclass, field from datetime import datetime, timezone +from importlib.metadata import PackageNotFoundError +from importlib.metadata import version as _pkg_version from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any +import numpy as np import yaml +from workrb.rankings import SCHEMA_VERSION from workrb.results import BenchmarkResults from workrb.tasks.abstract import Task +if TYPE_CHECKING: + from workrb.tasks.abstract.ranking_base import RankingDataset + logger = logging.getLogger(__name__) +def _get_workrb_version() -> str: + try: + return _pkg_version("workrb") + except PackageNotFoundError: + return "unknown" + + +def rankings_filename(task_name: str, dataset_id: str) -> str: + """Filename used for the artifact of one ``(task, dataset_id)`` pair.""" + safe_task = re.sub(r"[^A-Za-z0-9_.-]+", "_", task_name).strip("_") + safe_dataset = re.sub(r"[^A-Za-z0-9_.-]+", "_", dataset_id).strip("_") + return f"{safe_task}__{safe_dataset}.json" + + @dataclass class BenchmarkConfig: """ @@ -147,40 +168,65 @@ def get_rankings_dir(self) -> Path: def get_task_rankings_path(self, task_name: str, dataset_id: str) -> Path: """Get the output path for one task/dataset ranking artifact.""" - safe_task_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", task_name).strip("_") - safe_dataset_id = re.sub(r"[^A-Za-z0-9_.-]+", "_", dataset_id).strip("_") - filename = f"{safe_task_name}__{safe_dataset_id}.json" - return self.get_rankings_dir() / filename + return self.get_rankings_dir() / rankings_filename(task_name, dataset_id) def save_rankings_artifact( self, task_name: str, dataset_id: str, - scores: dict[str, dict[str, float]], - num_queries: int, - num_targets: int, + split: str, + dataset: "RankingDataset", + prediction_matrix: np.ndarray, ) -> Path: - """Save ranking scores for one dataset as a JSON artifact. - - The on-disk shape mirrors the ground-truth schema: - ``{model_id: {task_id: {language: {num_queries, num_targets, scores}}}}`` - where ``scores`` is ``{query_text: {target_text: score}}``. One file is - written per ``(task, dataset_id)`` so individual datasets can be loaded - without parsing the whole benchmark; consumers wanting a unified view - can merge files by deep-updating their nested dicts. + """Save the prediction matrix for one ``(task, dataset_id)`` as a JSON artifact. + + Schema: + ``{"header": {...metadata...}, "scores": {q_idx: {t_idx: score}}}`` + Query and target keys are positional indices (stringified, since JSON + object keys must be strings); the dataset's row order at the pinned + workrb version is the implicit ID source. Zero scores are omitted so + sparse models stay compact; consumers treat missing targets as 0. """ + workrb_version = _get_workrb_version() + + num_queries, num_targets = prediction_matrix.shape + if num_queries != len(dataset.query_texts): + raise ValueError( + f"prediction_matrix has {num_queries} rows but dataset has " + f"{len(dataset.query_texts)} queries" + ) + if num_targets != len(dataset.target_space): + raise ValueError( + f"prediction_matrix has {num_targets} cols but dataset has " + f"{len(dataset.target_space)} targets" + ) + rankings_path = self.get_task_rankings_path(task_name=task_name, dataset_id=dataset_id) rankings_path.parent.mkdir(parents=True, exist_ok=True) - payload = { - self.model_name: { - task_name: { - dataset_id: { - "num_queries": num_queries, - "num_targets": num_targets, - "scores": scores, - } - } + + scores: dict[str, dict[str, float]] = {} + matrix = prediction_matrix.tolist() + for q_idx, row in enumerate(matrix): + scores[str(q_idx)] = { + str(t_idx): float(score) for t_idx, score in enumerate(row) if score != 0 } + + payload = { + "header": { + "schema_version": SCHEMA_VERSION, + "workrb_version": workrb_version, + "model_name": self.model_name, + "task_name": task_name, + "dataset_id": dataset_id, + "split": split, + "num_queries": int(num_queries), + "num_targets": int(num_targets), + "first_query_text": dataset.query_texts[0], + "last_query_text": dataset.query_texts[-1], + "first_target_text": dataset.target_space[0], + "last_target_text": dataset.target_space[-1], + }, + "scores": scores, } with open(rankings_path, "w") as f: json.dump(payload, f, indent=2) diff --git a/src/workrb/rankings.py b/src/workrb/rankings.py new file mode 100644 index 0000000..2ed58d7 --- /dev/null +++ b/src/workrb/rankings.py @@ -0,0 +1,195 @@ +"""Loading and validation utilities for ranking artifacts. + +A ranking artifact is the JSON file produced by +:meth:`BenchmarkConfig.save_rankings_artifact` for one ``(task, dataset_id)`` +pair. This module provides the read-side counterparts used by +:func:`workrb.evaluate_rankings` to replay metrics without re-running a model. +""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from workrb.tasks.abstract.ranking_base import RankingDataset + +logger = logging.getLogger(__name__) + +SCHEMA_VERSION = 1 +"""Current on-disk schema version for ranking artifacts. + +Bumped when the JSON shape itself changes (field renames, new required fields, +restructuring of `scores`). Independent of ``workrb_version``: a workrb release +that does not change the schema keeps the same ``SCHEMA_VERSION``. +""" + +SUPPORTED_SCHEMA_VERSIONS: frozenset[int] = frozenset({1}) +"""Schema versions this reader can parse. Artifacts with any other +``schema_version`` are hard-rejected by :func:`validate_header`.""" + +SCHEMA_HEADER_FIELDS: dict[int, frozenset[str]] = { + 1: frozenset( + { + "schema_version", + "workrb_version", + "model_name", + "task_name", + "dataset_id", + "split", + "num_queries", + "num_targets", + "first_query_text", + "last_query_text", + "first_target_text", + "last_target_text", + } + ), +} +"""Pinned header field set per schema version. + +This is the source of truth that :func:`workrb.config.BenchmarkConfig.save_rankings_artifact` +must agree with. A test (``test_writer_header_matches_pinned_schema``) compares +freshly written headers against this mapping; mismatch means the writer changed +without bumping :data:`SCHEMA_VERSION`. When evolving the schema: + +1. Add a new entry ``SCHEMA_HEADER_FIELDS[N] = frozenset({...})``. +2. Bump :data:`SCHEMA_VERSION` to ``N``. +3. Decide whether to keep the old version in :data:`SUPPORTED_SCHEMA_VERSIONS` + (back-compat) or drop it (force rewrite). +""" + + +class RankingsArtifactMissing(FileNotFoundError): + """Raised when a needed ranking artifact file is not on disk.""" + + def __init__(self, path: Path, task_name: str, dataset_id: str): + self.path = path + self.task_name = task_name + self.dataset_id = dataset_id + super().__init__( + f"No ranking artifact for task '{task_name}', dataset '{dataset_id}' at {path}" + ) + + +class RankingsArtifactInvalid(ValueError): + """Raised when a ranking artifact does not match the current dataset.""" + + def __init__(self, path: Path, reason: str): + self.path = path + self.reason = reason + super().__init__(f"Invalid ranking artifact at {path}: {reason}") + + +def load_rankings_artifact(path: Path) -> tuple[dict, dict[int, dict[int, float]]]: + """Load a ranking artifact and return ``(header, scores)`` with integer keys. + + JSON object keys are always strings, so both axes are cast back to ``int``. + """ + with open(path) as f: + payload = json.load(f) + + if "header" not in payload or "scores" not in payload: + raise RankingsArtifactInvalid(path, "missing 'header' or 'scores' top-level keys") + + header = payload["header"] + raw_scores = payload["scores"] + scores: dict[int, dict[int, float]] = { + int(q): {int(t): float(s) for t, s in row.items()} for q, row in raw_scores.items() + } + return header, scores + + +def materialize_prediction_matrix( + scores: dict[int, dict[int, float]], + num_queries: int, + num_targets: int, +) -> np.ndarray: + """Build a dense ``(num_queries, num_targets)`` matrix from sparse scores.""" + matrix = np.zeros((num_queries, num_targets), dtype=np.float32) + for q_idx, row in scores.items(): + for t_idx, score in row.items(): + matrix[q_idx, t_idx] = score + return matrix + + +def validate_header( + header: dict, + dataset: RankingDataset, + task_name: str, + dataset_id: str, + split: str, + path: Path, + running_workrb_version: str, +) -> None: + """Validate an artifact header against the live dataset. + + Checks are run in three tiers: + + 1. ``schema_version`` must be in :data:`SUPPORTED_SCHEMA_VERSIONS`. This + gates whether the reader can parse the file at all; an unknown schema + version is a hard reject. + 2. Structural identity (task name, dataset id, split, sizes, canary + strings) must match the live dataset. Any mismatch is a hard reject. + 3. ``workrb_version`` is informational: a difference logs a warning but + does not block scoring. Cross-version replay is the intended use case + for :func:`workrb.evaluate_rankings` (metrics may have changed); the + real safety net is tier 2. + """ + schema_version = header.get("schema_version") + if schema_version not in SUPPORTED_SCHEMA_VERSIONS: + raise RankingsArtifactInvalid( + path, + f"unsupported schema_version {schema_version!r}; this reader " + f"supports {sorted(SUPPORTED_SCHEMA_VERSIONS)}", + ) + + if header.get("task_name") != task_name: + raise RankingsArtifactInvalid( + path, + f"header task_name '{header.get('task_name')}' does not match task '{task_name}'", + ) + if header.get("dataset_id") != dataset_id: + raise RankingsArtifactInvalid( + path, + f"header dataset_id '{header.get('dataset_id')}' does not match dataset '{dataset_id}'", + ) + if header.get("split") != split: + raise RankingsArtifactInvalid( + path, + f"header split '{header.get('split')}' does not match task split '{split}'", + ) + if header.get("num_queries") != len(dataset.query_texts): + raise RankingsArtifactInvalid( + path, + f"num_queries mismatch: header={header.get('num_queries')}, " + f"dataset={len(dataset.query_texts)}", + ) + if header.get("num_targets") != len(dataset.target_space): + raise RankingsArtifactInvalid( + path, + f"num_targets mismatch: header={header.get('num_targets')}, " + f"dataset={len(dataset.target_space)}", + ) + if header.get("first_query_text") != dataset.query_texts[0]: + raise RankingsArtifactInvalid(path, "first_query_text canary mismatch") + if header.get("last_query_text") != dataset.query_texts[-1]: + raise RankingsArtifactInvalid(path, "last_query_text canary mismatch") + if header.get("first_target_text") != dataset.target_space[0]: + raise RankingsArtifactInvalid(path, "first_target_text canary mismatch") + if header.get("last_target_text") != dataset.target_space[-1]: + raise RankingsArtifactInvalid(path, "last_target_text canary mismatch") + + header_version = header.get("workrb_version") + if header_version != running_workrb_version: + logger.warning( + "Ranking artifact %s was written by workrb %s, current is %s. " + "Sizes and canaries match, proceeding.", + path, + header_version, + running_workrb_version, + ) diff --git a/src/workrb/run.py b/src/workrb/run.py index 5eb2f91..6efc9c4 100644 --- a/src/workrb/run.py +++ b/src/workrb/run.py @@ -9,12 +9,20 @@ import time from collections import Counter from collections.abc import Sequence +from pathlib import Path from typing import Any -from workrb.config import BenchmarkConfig +from workrb.config import BenchmarkConfig, _get_workrb_version, rankings_filename from workrb.logging import setup_logger from workrb.metrics.reporting import format_results from workrb.models.base import ModelInterface +from workrb.rankings import ( + RankingsArtifactInvalid, + RankingsArtifactMissing, + load_rankings_artifact, + materialize_prediction_matrix, + validate_header, +) from workrb.results import ( BenchmarkMetadata, BenchmarkResults, @@ -200,6 +208,209 @@ def evaluate_multiple_models( return all_results +def evaluate_rankings( + rankings_dir: str | Path, + tasks: Sequence[Task], + output_folder: str, + metrics: dict[str, list[str]] | None = None, + description: str = "", + language_aggregation_mode: LanguageAggregationMode = LanguageAggregationMode.MONOLINGUAL_ONLY, + execution_mode: ExecutionMode = ExecutionMode.LAZY, +) -> BenchmarkResults: + """Compute benchmark metrics by replaying saved ranking artifacts. + + No model is required. ``rankings_dir`` is expected to be the + ``/rankings//`` directory produced by a + previous ``evaluate(..., save_rankings=True)`` run, or by an external + submission that wrote the same schema. + + Args: + rankings_dir: Directory containing per-``(task, dataset_id)`` JSON + artifacts produced by :meth:`BenchmarkConfig.save_rankings_artifact`. + tasks: Tasks to score. Non-ranking tasks are skipped with a warning. + output_folder: Where to write ``results.json``, ``config.yaml``, and + ``checkpoint.json`` (same layout as :func:`evaluate`). + metrics: Optional dict mapping task names to custom metrics lists. + description: Description for the benchmark run. + language_aggregation_mode: How per-language results should be grouped + in the returned ``BenchmarkResults``. + execution_mode: ``LAZY`` skips datasets incompatible with the chosen + aggregation mode, ``ALL`` scores everything for which an artifact + exists. + + Returns + ------- + ``BenchmarkResults`` populated from the artifacts. + + Raises + ------ + RankingsArtifactMissing: An expected artifact is not on disk. + RankingsArtifactInvalid: An artifact's header does not match the live + dataset, or multiple model_names are present in ``rankings_dir``. + + Notes + ----- + Cross-version replay (an artifact produced by a different ``workrb_version``) + is supported and is in fact the main reason this entry point exists: it + lets you recompute metrics after metric definitions change without rerunning + the model. Such a version drift only logs a warning. The real safety net + is :func:`workrb.rankings.validate_header`'s structural and canary checks, + which catch any change to dataset construction (deduplication, + postprocessing, source data) and raise :class:`RankingsArtifactInvalid`. + An unknown ``schema_version`` is a hard reject regardless of workrb + version. + """ + rankings_dir = Path(rankings_dir) + if not rankings_dir.is_dir(): + raise FileNotFoundError( + f"rankings_dir does not exist or is not a directory: {rankings_dir}" + ) + + model_name = _peek_model_name(rankings_dir) + + config = BenchmarkConfig( + model_name=model_name, + task_configs=[task.get_task_config() for task in tasks], + languages=sorted({lang.value for task in tasks for lang in task.languages}), + custom_metrics=metrics, + output_folder=output_folder, + description=description, + ) + + if execution_mode == ExecutionMode.LAZY: + dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate(tasks, language_aggregation_mode) + else: + dataset_ids_to_evaluate = {task.name: list(task.dataset_ids) for task in tasks} + + key_metrics_by_task_group = {task.task_group.value: task.default_metrics for task in tasks} + results = BenchmarkResults( + task_results={}, + metadata=BenchmarkMetadata( + model_name=model_name, + total_evaluation_time=0.0, + timestamp=time.time(), + num_tasks=len(tasks), + languages=_get_all_languages(tasks), + resumed_from_checkpoint=False, + language_aggregation_mode=language_aggregation_mode.value, + ), + key_metrics_by_task_group=key_metrics_by_task_group, + ) + + running_version = _get_workrb_version() + start_time_benchmark = time.time() + logger.info(f"Replaying rankings from {rankings_dir} (model: {model_name})") + + for task in tasks: + if not isinstance(task, RankingTask): + logger.warning( + "Skipping task '%s': evaluate_rankings only supports RankingTask.", task.name + ) + continue + + scoped = dataset_ids_to_evaluate.get(task.name, []) + if not scoped: + continue + + if task.name not in results.task_results: + results.task_results[task.name] = TaskResults( + metadata=TaskResultMetadata( + task_group=task.task_group.value, + task_type=task.task_type.value, + label_type=task.label_type.value, + description=task.description, + split=task.split.value, + ), + datasetid_results={}, + ) + + logger.info(f"{'=' * 60}") + logger.info(f"Scoring task: {task.name}") + + for dataset_id in scoped: + path = rankings_dir / rankings_filename(task.name, dataset_id) + if not path.exists(): + raise RankingsArtifactMissing(path, task.name, dataset_id) + + header, scores = load_rankings_artifact(path) + if header.get("model_name") != model_name: + raise RankingsArtifactInvalid( + path, + f"model_name mismatch with sibling artifacts: header has " + f"'{header.get('model_name')}', expected '{model_name}'", + ) + dataset = task.datasets[dataset_id] + validate_header( + header=header, + dataset=dataset, + task_name=task.name, + dataset_id=dataset_id, + split=task.split.value, + path=path, + running_workrb_version=running_version, + ) + + start_time_eval = time.time() + prediction_matrix = materialize_prediction_matrix( + scores=scores, + num_queries=header["num_queries"], + num_targets=header["num_targets"], + ) + task_metrics = metrics.get(task.name) if metrics else None + metrics_dict = task.compute_metrics_from_prediction_matrix( + prediction_matrix=prediction_matrix, + dataset_id=dataset_id, + metrics=task_metrics, + ) + evaluation_time = time.time() - start_time_eval + + logger.info(f"* {dataset_id} ({task.get_size_oneliner(dataset_id)})") + _record_dataset_result( + results=results, + config=config, + task=task, + dataset_id=dataset_id, + metrics_dict=metrics_dict, + evaluation_time=evaluation_time, + ) + + results.metadata.total_evaluation_time = time.time() - start_time_benchmark + config.save_final_result_artifacts(results) + + logger.info(f"{'=' * 60}") + logger.info("✓ evaluate_rankings COMPLETE") + logger.info(f"Total time: {results.metadata.total_evaluation_time:.2f}s") + logger.info( + format_results( + results, + display_per_task=False, + display_per_task_group=False, + display_per_language=False, + display_overall=True, + language_aggregation_mode=language_aggregation_mode, + ) + ) + logger.info(f"{'=' * 60}") + return results + + +def _peek_model_name(rankings_dir: Path) -> str: + """Read ``model_name`` from the first artifact in ``rankings_dir``. + + All artifacts in one directory must agree on ``model_name``; later + artifacts that disagree raise :class:`RankingsArtifactInvalid` when + they are loaded for scoring. + """ + json_files = sorted(rankings_dir.glob("*.json")) + if not json_files: + raise FileNotFoundError(f"No ranking artifacts (*.json) found in {rankings_dir}") + header, _ = load_rankings_artifact(json_files[0]) + model_name = header.get("model_name") + if not model_name: + raise RankingsArtifactInvalid(json_files[0], "header missing 'model_name'") + return model_name + + def get_tasks_overview( tasks: Sequence[Task], dataset_ids_to_evaluate: dict[str, list[str]] | None = None, @@ -494,17 +705,12 @@ def _run_pending_work( dataset_id=dataset_id, metrics=task_metrics, ) - dataset = task.datasets[dataset_id] rankings_path = config.save_rankings_artifact( task_name=task.name, dataset_id=dataset_id, - scores=_build_scores( - query_texts=dataset.query_texts, - target_space=dataset.target_space, - prediction_matrix=prediction_matrix, - ), - num_queries=prediction_matrix.shape[0], - num_targets=prediction_matrix.shape[1], + split=task.split.value, + dataset=task.datasets[dataset_id], + prediction_matrix=prediction_matrix, ) logger.info(f"\tSaved ranking scores to: {rankings_path}") else: @@ -513,26 +719,14 @@ def _run_pending_work( ) evaluation_time = time.time() - start_time_eval - # Store results - dataset_languages = task.get_dataset_languages(dataset_id) - results.task_results[task.name].datasetid_results[dataset_id] = MetricsResult( - evaluation_time=evaluation_time, + _record_dataset_result( + results=results, + config=config, + task=task, + dataset_id=dataset_id, metrics_dict=dataset_results, - input_languages=sorted( - lang.value for lang in dataset_languages.input_languages - ), - output_languages=sorted( - lang.value for lang in dataset_languages.output_languages - ), + evaluation_time=evaluation_time, ) - - # Save incremental results to checkpoint - if config: - config.save_results_checkpoint(results) - - # Show key metrics - key_metric = task.default_metrics[0] - logger.info(f"\t{key_metric}: {dataset_results[key_metric]:.3f}") run_idx += 1 except Exception as e: logger.error(f"Error: {e}") @@ -542,21 +736,29 @@ def _run_pending_work( return results -def _build_scores( - query_texts: list[str], - target_space: list[str], - prediction_matrix: Any, -) -> dict[str, dict[str, float]]: - """Build nested ``{query_text: {target_text: score}}`` from a prediction matrix. - - Zero scores are omitted so sparse models (e.g. BM25) stay compact; - consumers should treat a missing target as a score of 0. +def _record_dataset_result( + results: BenchmarkResults, + config: BenchmarkConfig, + task: Task, + dataset_id: str, + metrics_dict: dict[str, float], + evaluation_time: float, +) -> None: + """Attach a per-dataset metrics result, checkpoint, and log the key metric. + + Shared between :func:`evaluate` and :func:`evaluate_rankings` so both paths + produce identical ``BenchmarkResults`` bookkeeping. """ - matrix = prediction_matrix.tolist() - scores: dict[str, dict[str, float]] = {} - for q_idx, query_text in enumerate(query_texts): - row = matrix[q_idx] - scores[query_text] = { - target_space[t_idx]: score for t_idx, score in enumerate(row) if score != 0 - } - return scores + dataset_languages = task.get_dataset_languages(dataset_id) + results.task_results[task.name].datasetid_results[dataset_id] = MetricsResult( + evaluation_time=evaluation_time, + metrics_dict=metrics_dict, + input_languages=sorted(lang.value for lang in dataset_languages.input_languages), + output_languages=sorted(lang.value for lang in dataset_languages.output_languages), + ) + + if config: + config.save_results_checkpoint(results) + + key_metric = task.default_metrics[0] + logger.info(f"\t{key_metric}: {metrics_dict[key_metric]:.3f}") diff --git a/tests/test_save_rankings_artifacts.py b/tests/test_save_rankings_artifacts.py index afca551..8a11d2f 100644 --- a/tests/test_save_rankings_artifacts.py +++ b/tests/test_save_rankings_artifacts.py @@ -1,6 +1,7 @@ -"""Tests for ranking artifact persistence in evaluate(save_rankings=...).""" +"""Tests for ranking artifact persistence and replay.""" import json +import logging import shutil from pathlib import Path @@ -9,6 +10,12 @@ import workrb from workrb.models.base import ModelInterface +from workrb.rankings import ( + SCHEMA_HEADER_FIELDS, + SCHEMA_VERSION, + RankingsArtifactInvalid, + RankingsArtifactMissing, +) from workrb.tasks.abstract.base import DatasetSplit, LabelType, Language from workrb.tasks.abstract.ranking_base import RankingDataset, RankingTask, RankingTaskGroup from workrb.types import ModelInputType @@ -76,10 +83,10 @@ def _compute_rankings( query_input_type: ModelInputType, target_input_type: ModelInputType, ) -> torch.Tensor: - # 2 queries x 3 targets + # 2 queries x 3 targets, with one explicit zero to exercise sparsity return torch.tensor( [ - [0.1, 0.2, 0.3], + [0.1, 0.0, 0.3], [0.4, 0.5, 0.6], ], dtype=torch.float32, @@ -99,65 +106,249 @@ def classification_label_space(self) -> list[str] | None: return None -def test_evaluate_saves_rankings_artifact_when_enabled(): - """evaluate(save_rankings=True) writes one JSON artifact per ranking dataset with full per-target scores.""" - output_folder = Path("tmp/rankings_artifact_test_enabled") - if output_folder.exists(): - shutil.rmtree(output_folder, ignore_errors=True) +def _fresh_dir(name: str) -> Path: + path = Path("tmp") / name + if path.exists(): + shutil.rmtree(path, ignore_errors=True) + return path + +def _run_and_get_rankings_dir(output_folder: Path) -> Path: model = TinyDeterministicModel() tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] - - _ = workrb.evaluate( + workrb.evaluate( model=model, tasks=tasks, output_folder=str(output_folder), force_restart=True, save_rankings=True, ) + return output_folder / "rankings" / model.name + + +def test_evaluate_saves_rankings_artifact_with_new_schema(): + """evaluate(save_rankings=True) writes one JSON per ranking dataset with header + sparse int-keyed scores.""" + output_folder = _fresh_dir("rankings_artifact_test_enabled") + rankings_dir = _run_and_get_rankings_dir(output_folder) - rankings_dir = output_folder / "rankings" / model.name ranking_files = list(rankings_dir.glob("*.json")) assert len(ranking_files) == 1 with open(ranking_files[0]) as f: payload = json.load(f) - assert set(payload.keys()) == {model.name} - task_payload = payload[model.name] - assert set(task_payload.keys()) == {"Tiny Ranking Task"} - dataset_payload = task_payload["Tiny Ranking Task"] - assert set(dataset_payload.keys()) == {"en"} - - leaf = dataset_payload["en"] - assert leaf["num_queries"] == 2 - assert leaf["num_targets"] == 3 - - scores = leaf["scores"] - assert set(scores.keys()) == {"query one", "query two"} - assert scores["query one"]["target_a"] == pytest.approx(0.1) - assert scores["query one"]["target_b"] == pytest.approx(0.2) - assert scores["query one"]["target_c"] == pytest.approx(0.3) - assert scores["query two"]["target_a"] == pytest.approx(0.4) - assert scores["query two"]["target_b"] == pytest.approx(0.5) - assert scores["query two"]["target_c"] == pytest.approx(0.6) + assert set(payload.keys()) == {"header", "scores"} + + header = payload["header"] + assert header["schema_version"] == 1 + assert header["model_name"] == "tiny-deterministic-model" + assert header["task_name"] == "Tiny Ranking Task" + assert header["dataset_id"] == "en" + assert header["split"] == "test" + assert header["num_queries"] == 2 + assert header["num_targets"] == 3 + assert header["first_query_text"] == "query one" + assert header["last_query_text"] == "query two" + assert header["first_target_text"] == "target_a" + assert header["last_target_text"] == "target_c" + assert isinstance(header["workrb_version"], str) and header["workrb_version"] + + scores = payload["scores"] + assert set(scores.keys()) == {"0", "1"} + # Row 0 has a zero on target index 1 (sparse omission) + assert set(scores["0"].keys()) == {"0", "2"} + assert scores["0"]["0"] == pytest.approx(0.1) + assert scores["0"]["2"] == pytest.approx(0.3) + assert set(scores["1"].keys()) == {"0", "1", "2"} + assert scores["1"]["0"] == pytest.approx(0.4) + assert scores["1"]["1"] == pytest.approx(0.5) + assert scores["1"]["2"] == pytest.approx(0.6) def test_evaluate_does_not_save_rankings_artifact_by_default(): """evaluate() without save_rankings does not create a rankings/ directory.""" - output_folder = Path("tmp/rankings_artifact_test_disabled") - if output_folder.exists(): - shutil.rmtree(output_folder, ignore_errors=True) - + output_folder = _fresh_dir("rankings_artifact_test_disabled") model = TinyDeterministicModel() tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] - - _ = workrb.evaluate( + workrb.evaluate( model=model, tasks=tasks, output_folder=str(output_folder), force_restart=True, ) - rankings_dir = output_folder / "rankings" - assert not rankings_dir.exists() + assert not (output_folder / "rankings").exists() + + +def test_evaluate_rankings_roundtrip_parity(): + """evaluate_rankings on saved artifacts reproduces metrics from evaluate(save_rankings=True).""" + write_dir = _fresh_dir("rankings_artifact_roundtrip_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + + with open(write_dir / "results.json") as f: + write_results = json.load(f) + write_metrics = write_results["task_results"]["Tiny Ranking Task"]["datasetid_results"]["en"][ + "metrics_dict" + ] + + read_output = _fresh_dir("rankings_artifact_roundtrip_read") + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + replay = workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + replay_metrics = replay.task_results["Tiny Ranking Task"].datasetid_results["en"].metrics_dict + assert set(replay_metrics.keys()) == set(write_metrics.keys()) + for key, value in write_metrics.items(): + assert replay_metrics[key] == pytest.approx(value) + + +def test_evaluate_rankings_missing_artifact_raises(): + """A missing artifact file raises RankingsArtifactMissing.""" + write_dir = _fresh_dir("rankings_artifact_missing_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + + # Remove the only artifact, then add a stub so the directory peek succeeds + # but the per-task lookup fails. + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + stub_payload = json.loads(files[0].read_text()) + files[0].unlink() + decoy = rankings_dir / "Some_Other_Task__xx.json" + decoy.write_text(json.dumps(stub_payload)) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_missing_read") + with pytest.raises(RankingsArtifactMissing): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def _hand_edit_header(rankings_dir: Path, **header_overrides) -> Path: + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + path = files[0] + payload = json.loads(path.read_text()) + payload["header"].update(header_overrides) + path.write_text(json.dumps(payload)) + return path + + +def test_evaluate_rankings_size_mismatch_raises(): + write_dir = _fresh_dir("rankings_artifact_size_mismatch_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, num_queries=99) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_size_mismatch_read") + with pytest.raises(RankingsArtifactInvalid, match="num_queries"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_canary_mismatch_raises(): + write_dir = _fresh_dir("rankings_artifact_canary_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, first_query_text="this is not the original query") + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_canary_read") + with pytest.raises(RankingsArtifactInvalid, match="first_query_text"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_split_mismatch_raises(): + """An artifact written for one split cannot be replayed against another.""" + write_dir = _fresh_dir("rankings_artifact_split_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, split="val") + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_split_read") + with pytest.raises(RankingsArtifactInvalid, match="split"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_writer_header_matches_pinned_schema(): + """The writer must emit exactly the header fields pinned in SCHEMA_HEADER_FIELDS. + + This is the gate against silently changing the on-disk schema. If you add, + remove, or rename a header field, bump SCHEMA_VERSION and add a new entry + to SCHEMA_HEADER_FIELDS rather than editing the current one in place. + """ + write_dir = _fresh_dir("rankings_artifact_pinned_schema") + rankings_dir = _run_and_get_rankings_dir(write_dir) + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + payload = json.loads(files[0].read_text()) + + written_fields = frozenset(payload["header"].keys()) + pinned_fields = SCHEMA_HEADER_FIELDS[SCHEMA_VERSION] + assert written_fields == pinned_fields, ( + f"writer header drift from SCHEMA_VERSION={SCHEMA_VERSION}: " + f"added={sorted(written_fields - pinned_fields)}, " + f"removed={sorted(pinned_fields - written_fields)}. " + "Bump SCHEMA_VERSION and add a new entry in SCHEMA_HEADER_FIELDS." + ) + + +def test_evaluate_rankings_unknown_schema_version_raises(): + """An unknown schema_version is a hard reject (independent of workrb_version).""" + write_dir = _fresh_dir("rankings_artifact_schema_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, schema_version=999) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_schema_read") + with pytest.raises(RankingsArtifactInvalid, match="schema_version"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_version_mismatch_warns_but_proceeds(): + """workrb_version mismatch only logs a warning; metrics still computed.""" + write_dir = _fresh_dir("rankings_artifact_version_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, workrb_version="0.0.0-test-fixture") + + records: list[logging.LogRecord] = [] + + class _Capture(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + records.append(record) + + capture = _Capture(level=logging.WARNING) + rankings_logger = logging.getLogger("workrb.rankings") + rankings_logger.addHandler(capture) + try: + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_version_read") + replay = workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + finally: + rankings_logger.removeHandler(capture) + + assert any("0.0.0-test-fixture" in r.getMessage() for r in records) + assert "Tiny Ranking Task" in replay.task_results From 2a84f1c6d09d8c317a924f57eaf6c203d1374d9c Mon Sep 17 00:00:00 2001 From: "warre.veys" Date: Fri, 22 May 2026 12:29:07 +0200 Subject: [PATCH 5/7] documentation enhancement --- README.md | 34 +++++++++++++++++++++++++++++--- src/workrb/config.py | 7 +++++-- src/workrb/run.py | 33 +++++++++++++++++++------------ tests/test_multi_dataset_task.py | 23 ++++++++++++++------- 4 files changed, 72 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 41e4df4..d9fdf95 100644 --- a/README.md +++ b/README.md @@ -219,9 +219,37 @@ results/my_model/ └── __.json ``` -Each JSON contains `model_name`, `task_name`, `dataset_id`, `num_queries`, -`num_targets`, and `scores_by_target` (a mapping from target text to its -score across all queries). +Each JSON file has two top-level keys: + +- `header`: identifies the artifact and pins it to a specific dataset + shape. Includes `schema_version`, `workrb_version`, `model_name`, + `task_name`, `dataset_id`, `split`, `num_queries`, `num_targets`, + plus four canary strings (`first_query_text`, `last_query_text`, + `first_target_text`, `last_target_text`) used to detect silent dataset + drift on replay. +- `scores`: a sparse `{query_index: {target_index: score}}` mapping, with + keys as positional indices into the live dataset's `query_texts` / + `target_space` (stringified, as JSON requires string keys). Scores of + exactly `0.0` are omitted; consumers materialize missing entries as + `0.0`. + +Once written, you can recompute metrics without rerunning the model by +pointing `evaluate_rankings` at the model-scoped directory: + +```python +results = workrb.evaluate_rankings( + rankings_dir="results/my_model/rankings/my_model", + tasks=tasks, + output_folder="results/my_model_replay", +) +``` + +This is the recommended way to re-score after a metric definition has +changed (e.g. a new ranking metric is added in a workrb release): replay +is cheap, the model never runs again, and `validate_header` rejects any +artifact whose dataset shape no longer matches the live one. A +`workrb_version` mismatch only logs a warning; an unknown +`schema_version` is a hard reject. To load & parse results from a run: diff --git a/src/workrb/config.py b/src/workrb/config.py index 27887c9..1a0292f 100644 --- a/src/workrb/config.py +++ b/src/workrb/config.py @@ -184,8 +184,11 @@ def save_rankings_artifact( ``{"header": {...metadata...}, "scores": {q_idx: {t_idx: score}}}`` Query and target keys are positional indices (stringified, since JSON object keys must be strings); the dataset's row order at the pinned - workrb version is the implicit ID source. Zero scores are omitted so - sparse models stay compact; consumers treat missing targets as 0. + workrb version is the implicit ID source. + + Sparsity convention: any score that equals exactly ``0.0`` is omitted + from the row, and consumers materialize missing ``(q, t)`` entries as + ``0.0`` (see :func:`workrb.rankings.materialize_prediction_matrix`). """ workrb_version = _get_workrb_version() diff --git a/src/workrb/run.py b/src/workrb/run.py index 6efc9c4..5c5d5f8 100644 --- a/src/workrb/run.py +++ b/src/workrb/run.py @@ -93,10 +93,9 @@ def evaluate( ) # Determine which datasets are in scope for this run - if execution_mode == ExecutionMode.LAZY: - dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate(tasks, language_aggregation_mode) - else: - dataset_ids_to_evaluate = {task.name: list(task.dataset_ids) for task in tasks} + dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate( + tasks, language_aggregation_mode, execution_mode + ) pending_work = _filter_pending_work(pending_work, dataset_ids_to_evaluate) total_evaluations = sum(len(dids) for dids in dataset_ids_to_evaluate.values()) @@ -277,10 +276,9 @@ def evaluate_rankings( description=description, ) - if execution_mode == ExecutionMode.LAZY: - dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate(tasks, language_aggregation_mode) - else: - dataset_ids_to_evaluate = {task.name: list(task.dataset_ids) for task in tasks} + dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate( + tasks, language_aggregation_mode, execution_mode + ) key_metrics_by_task_group = {task.task_group.value: task.default_metrics for task in tasks} results = BenchmarkResults( @@ -485,19 +483,25 @@ def _get_all_languages(tasks: Sequence[Task]) -> list[str]: def _get_dataset_ids_to_evaluate( tasks: Sequence[Task], language_aggregation_mode: LanguageAggregationMode, + execution_mode: ExecutionMode, ) -> dict[str, list[str]]: - """Compute which dataset IDs per task are compatible with the aggregation mode. + """Compute which dataset IDs per task are in scope for this run. - This is the single source of truth for the run's scope when - ``execution_mode`` is ``LAZY``. The returned dict drives the overview - display, total-evaluation count, and pending-work filtering. + Single source of truth for run scope. The returned dict drives the + overview display, total-evaluation count, and pending-work filtering. + Under ``ExecutionMode.ALL`` every dataset is in scope; under + ``ExecutionMode.LAZY`` datasets incompatible with the chosen + aggregation mode are dropped (and a warning is logged for each). Parameters ---------- tasks : Sequence[Task] All tasks configured for this benchmark run. language_aggregation_mode : LanguageAggregationMode - The aggregation mode to check compatibility against. + The aggregation mode to check compatibility against (only used + when ``execution_mode`` is ``LAZY``). + execution_mode : ExecutionMode + ``LAZY`` filters incompatible datasets; ``ALL`` keeps everything. Returns ------- @@ -506,6 +510,9 @@ def _get_dataset_ids_to_evaluate( Tasks whose datasets are all incompatible still appear as keys with an empty list. """ + if execution_mode == ExecutionMode.ALL: + return {task.name: list(task.dataset_ids) for task in tasks} + if language_aggregation_mode == LanguageAggregationMode.SKIP_LANGUAGE_AGGREGATION: return {task.name: list(task.dataset_ids) for task in tasks} diff --git a/tests/test_multi_dataset_task.py b/tests/test_multi_dataset_task.py index 07b2df1..d09a525 100644 --- a/tests/test_multi_dataset_task.py +++ b/tests/test_multi_dataset_task.py @@ -22,6 +22,7 @@ from workrb.tasks import ESCOJob2SkillRanking, RankingDataset from workrb.types import ( DatasetLanguages, + ExecutionMode, Language, LanguageAggregationMode, get_language_grouping_key, @@ -476,7 +477,9 @@ def test_monolingual_only_skips_crosslingual(self): ), }, ) - result = _get_dataset_ids_to_evaluate([task], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [task], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {"task1": ["en"]} def test_group_input_keeps_crosslingual_singleton_input(self): @@ -495,7 +498,7 @@ def test_group_input_keeps_crosslingual_singleton_input(self): }, ) result = _get_dataset_ids_to_evaluate( - [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES + [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES, ExecutionMode.LAZY ) assert result == {"task1": ["en_de"]} @@ -531,7 +534,9 @@ def test_monolingual_only_mixed_task_keeps_only_monolingual(self): ), }, ) - result = _get_dataset_ids_to_evaluate([task], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [task], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {"melo_task": ["en", "de"]} def test_group_input_mixed_task_keeps_singleton_input(self): @@ -562,13 +567,15 @@ def test_group_input_mixed_task_keeps_singleton_input(self): }, ) result = _get_dataset_ids_to_evaluate( - [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES + [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES, ExecutionMode.LAZY ) assert result == {"melo_task": ["en", "en_de", "fr_de"]} def test_no_tasks(self): """Empty task list returns empty dict.""" - result = _get_dataset_ids_to_evaluate([], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {} def test_all_datasets_incompatible(self): @@ -586,7 +593,9 @@ def test_all_datasets_incompatible(self): ), }, ) - result = _get_dataset_ids_to_evaluate([task], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [task], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {"task1": []} def test_skip_language_aggregation_keeps_all(self): @@ -609,7 +618,7 @@ def test_skip_language_aggregation_keeps_all(self): }, ) result = _get_dataset_ids_to_evaluate( - [task], LanguageAggregationMode.SKIP_LANGUAGE_AGGREGATION + [task], LanguageAggregationMode.SKIP_LANGUAGE_AGGREGATION, ExecutionMode.LAZY ) assert result == {"task1": ["en", "en_de", "multi"]} From 2133f2fd2ada38d179fb57c4a7ce3d013fc52aaa Mon Sep 17 00:00:00 2001 From: "warre.veys" Date: Fri, 22 May 2026 12:32:55 +0200 Subject: [PATCH 6/7] lint --- src/workrb/rankings.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/workrb/rankings.py b/src/workrb/rankings.py index 2ed58d7..ed75739 100644 --- a/src/workrb/rankings.py +++ b/src/workrb/rankings.py @@ -20,18 +20,30 @@ logger = logging.getLogger(__name__) +# Current on-disk schema version for ranking artifacts. +# +# Bumped when the JSON shape itself changes (field renames, new required +# fields, restructuring of `scores`). Independent of ``workrb_version``: +# a workrb release that does not change the schema keeps the same value. SCHEMA_VERSION = 1 -"""Current on-disk schema version for ranking artifacts. - -Bumped when the JSON shape itself changes (field renames, new required fields, -restructuring of `scores`). Independent of ``workrb_version``: a workrb release -that does not change the schema keeps the same ``SCHEMA_VERSION``. -""" +# Schema versions this reader can parse. Artifacts with any other +# ``schema_version`` are hard-rejected by :func:`validate_header`. SUPPORTED_SCHEMA_VERSIONS: frozenset[int] = frozenset({1}) -"""Schema versions this reader can parse. Artifacts with any other -``schema_version`` are hard-rejected by :func:`validate_header`.""" +# Pinned header field set per schema version. +# +# This is the source of truth that +# :func:`workrb.config.BenchmarkConfig.save_rankings_artifact` must agree +# with. A test (``test_writer_header_matches_pinned_schema``) compares +# freshly written headers against this mapping; mismatch means the writer +# changed without bumping :data:`SCHEMA_VERSION`. When evolving the schema: +# +# 1. Add a new entry ``SCHEMA_HEADER_FIELDS[N] = frozenset({...})``. +# 2. Bump :data:`SCHEMA_VERSION` to ``N``. +# 3. Decide whether to keep the old version in +# :data:`SUPPORTED_SCHEMA_VERSIONS` (back-compat) or drop it +# (force rewrite). SCHEMA_HEADER_FIELDS: dict[int, frozenset[str]] = { 1: frozenset( { @@ -50,18 +62,6 @@ } ), } -"""Pinned header field set per schema version. - -This is the source of truth that :func:`workrb.config.BenchmarkConfig.save_rankings_artifact` -must agree with. A test (``test_writer_header_matches_pinned_schema``) compares -freshly written headers against this mapping; mismatch means the writer changed -without bumping :data:`SCHEMA_VERSION`. When evolving the schema: - -1. Add a new entry ``SCHEMA_HEADER_FIELDS[N] = frozenset({...})``. -2. Bump :data:`SCHEMA_VERSION` to ``N``. -3. Decide whether to keep the old version in :data:`SUPPORTED_SCHEMA_VERSIONS` - (back-compat) or drop it (force rewrite). -""" class RankingsArtifactMissing(FileNotFoundError): From b38ccce8699aa1486d6706f19b6e7f539237f948 Mon Sep 17 00:00:00 2001 From: "warre.veys" Date: Tue, 26 May 2026 16:19:06 +0200 Subject: [PATCH 7/7] tweaks --- README.md | 8 +-- src/workrb/config.py | 32 +++++----- src/workrb/rankings.py | 44 ++++++++++++-- src/workrb/results.py | 3 + src/workrb/run.py | 50 +++++++++------ tests/test_save_rankings_artifacts.py | 87 +++++++++++++++++++++++++-- 6 files changed, 178 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index d9fdf95..deefdde 100644 --- a/README.md +++ b/README.md @@ -227,11 +227,11 @@ Each JSON file has two top-level keys: plus four canary strings (`first_query_text`, `last_query_text`, `first_target_text`, `last_target_text`) used to detect silent dataset drift on replay. -- `scores`: a sparse `{query_index: {target_index: score}}` mapping, with +- `scores`: a `{query_index: {target_index: score}}` mapping, with keys as positional indices into the live dataset's `query_texts` / - `target_space` (stringified, as JSON requires string keys). Scores of - exactly `0.0` are omitted; consumers materialize missing entries as - `0.0`. + `target_space` (stringified, as JSON requires string keys). Every + `(query, target)` cell is stored. Non-finite values (`NaN`, `+inf`, `-inf`) are + rejected at write time. Once written, you can recompute metrics without rerunning the model by pointing `evaluate_rankings` at the model-scoped directory: diff --git a/src/workrb/config.py b/src/workrb/config.py index 1a0292f..219da80 100644 --- a/src/workrb/config.py +++ b/src/workrb/config.py @@ -20,7 +20,7 @@ import numpy as np import yaml -from workrb.rankings import SCHEMA_VERSION +from workrb.rankings import SCHEMA_VERSION, rankings_filename from workrb.results import BenchmarkResults from workrb.tasks.abstract import Task @@ -37,13 +37,6 @@ def _get_workrb_version() -> str: return "unknown" -def rankings_filename(task_name: str, dataset_id: str) -> str: - """Filename used for the artifact of one ``(task, dataset_id)`` pair.""" - safe_task = re.sub(r"[^A-Za-z0-9_.-]+", "_", task_name).strip("_") - safe_dataset = re.sub(r"[^A-Za-z0-9_.-]+", "_", dataset_id).strip("_") - return f"{safe_task}__{safe_dataset}.json" - - @dataclass class BenchmarkConfig: """ @@ -184,11 +177,12 @@ def save_rankings_artifact( ``{"header": {...metadata...}, "scores": {q_idx: {t_idx: score}}}`` Query and target keys are positional indices (stringified, since JSON object keys must be strings); the dataset's row order at the pinned - workrb version is the implicit ID source. + workrb version is the implicit ID source. Every ``(q, t)`` cell is + stored. - Sparsity convention: any score that equals exactly ``0.0`` is omitted - from the row, and consumers materialize missing ``(q, t)`` entries as - ``0.0`` (see :func:`workrb.rankings.materialize_prediction_matrix`). + Non-finite scores (``NaN``, ``+inf``, ``-inf``) are rejected: standard + JSON cannot represent them, and silently coercing would corrupt the + artifact for downstream readers. """ workrb_version = _get_workrb_version() @@ -203,6 +197,14 @@ def save_rankings_artifact( f"prediction_matrix has {num_targets} cols but dataset has " f"{len(dataset.target_space)} targets" ) + if not np.all(np.isfinite(prediction_matrix)): + bad = np.argwhere(~np.isfinite(prediction_matrix)) + sample = bad[0] + raise ValueError( + f"prediction_matrix contains non-finite values (e.g. at " + f"query_index={int(sample[0])}, target_index={int(sample[1])}); " + "JSON cannot represent NaN/inf, refusing to write a corrupt artifact" + ) rankings_path = self.get_task_rankings_path(task_name=task_name, dataset_id=dataset_id) rankings_path.parent.mkdir(parents=True, exist_ok=True) @@ -210,9 +212,7 @@ def save_rankings_artifact( scores: dict[str, dict[str, float]] = {} matrix = prediction_matrix.tolist() for q_idx, row in enumerate(matrix): - scores[str(q_idx)] = { - str(t_idx): float(score) for t_idx, score in enumerate(row) if score != 0 - } + scores[str(q_idx)] = {str(t_idx): float(score) for t_idx, score in enumerate(row)} payload = { "header": { @@ -232,7 +232,7 @@ def save_rankings_artifact( "scores": scores, } with open(rankings_path, "w") as f: - json.dump(payload, f, indent=2) + json.dump(payload, f, indent=2, allow_nan=False) logger.debug(f"Ranking artifact saved to {rankings_path}") return rankings_path diff --git a/src/workrb/rankings.py b/src/workrb/rankings.py index ed75739..a61c165 100644 --- a/src/workrb/rankings.py +++ b/src/workrb/rankings.py @@ -10,6 +10,8 @@ import json import logging +import math +import re from pathlib import Path from typing import TYPE_CHECKING @@ -64,6 +66,13 @@ } +def rankings_filename(task_name: str, dataset_id: str) -> str: + """Filename used for the artifact of one ``(task, dataset_id)`` pair.""" + safe_task = re.sub(r"[^A-Za-z0-9_.-]+", "_", task_name).strip("_") + safe_dataset = re.sub(r"[^A-Za-z0-9_.-]+", "_", dataset_id).strip("_") + return f"{safe_task}__{safe_dataset}.json" + + class RankingsArtifactMissing(FileNotFoundError): """Raised when a needed ranking artifact file is not on disk.""" @@ -89,6 +98,9 @@ def load_rankings_artifact(path: Path) -> tuple[dict, dict[int, dict[int, float] """Load a ranking artifact and return ``(header, scores)`` with integer keys. JSON object keys are always strings, so both axes are cast back to ``int``. + Non-finite score values (``NaN``, ``+inf``, ``-inf``) are rejected: the + writer refuses to emit them, so their presence indicates a hand-edited or + corrupt artifact. """ with open(path) as f: payload = json.load(f) @@ -98,9 +110,19 @@ def load_rankings_artifact(path: Path) -> tuple[dict, dict[int, dict[int, float] header = payload["header"] raw_scores = payload["scores"] - scores: dict[int, dict[int, float]] = { - int(q): {int(t): float(s) for t, s in row.items()} for q, row in raw_scores.items() - } + scores: dict[int, dict[int, float]] = {} + for q, row in raw_scores.items(): + q_idx = int(q) + parsed_row: dict[int, float] = {} + for t, s in row.items(): + value = float(s) + if not math.isfinite(value): + raise RankingsArtifactInvalid( + path, + f"non-finite score at query_index={q_idx}, target_index={int(t)}: {value!r}", + ) + parsed_row[int(t)] = value + scores[q_idx] = parsed_row return header, scores @@ -109,10 +131,24 @@ def materialize_prediction_matrix( num_queries: int, num_targets: int, ) -> np.ndarray: - """Build a dense ``(num_queries, num_targets)`` matrix from sparse scores.""" + """Build a dense ``(num_queries, num_targets)`` matrix from the score map. + + Raises ``IndexError`` if any ``(q_idx, t_idx)`` falls outside the declared + shape. Callers are expected to have validated the header against the live + dataset first; this is a defensive backstop against malformed artifacts. + """ matrix = np.zeros((num_queries, num_targets), dtype=np.float32) for q_idx, row in scores.items(): + if not 0 <= q_idx < num_queries: + raise IndexError( + f"query_index {q_idx} out of bounds for num_queries={num_queries}" + ) for t_idx, score in row.items(): + if not 0 <= t_idx < num_targets: + raise IndexError( + f"target_index {t_idx} out of bounds for num_targets={num_targets} " + f"(at query_index={q_idx})" + ) matrix[q_idx, t_idx] = score return matrix diff --git a/src/workrb/results.py b/src/workrb/results.py index fa29ce9..0152925 100644 --- a/src/workrb/results.py +++ b/src/workrb/results.py @@ -61,6 +61,9 @@ class BenchmarkMetadata(BaseModel): languages: list[str] resumed_from_checkpoint: bool = False language_aggregation_mode: str = LanguageAggregationMode.MONOLINGUAL_ONLY.value + replayed_from_workrb_version: str | None = None + """workrb version that wrote the ranking artifacts when this run was + produced by :func:`workrb.evaluate_rankings`; ``None`` for normal runs.""" class ResultTagString(BaseModel): diff --git a/src/workrb/run.py b/src/workrb/run.py index 5c5d5f8..770cc7e 100644 --- a/src/workrb/run.py +++ b/src/workrb/run.py @@ -12,7 +12,7 @@ from pathlib import Path from typing import Any -from workrb.config import BenchmarkConfig, _get_workrb_version, rankings_filename +from workrb.config import BenchmarkConfig, _get_workrb_version from workrb.logging import setup_logger from workrb.metrics.reporting import format_results from workrb.models.base import ModelInterface @@ -21,6 +21,7 @@ RankingsArtifactMissing, load_rankings_artifact, materialize_prediction_matrix, + rankings_filename, validate_header, ) from workrb.results import ( @@ -220,13 +221,17 @@ def evaluate_rankings( No model is required. ``rankings_dir`` is expected to be the ``/rankings//`` directory produced by a - previous ``evaluate(..., save_rankings=True)`` run, or by an external - submission that wrote the same schema. + previous ``evaluate(..., save_rankings=True)`` run. + + Always rescores from scratch: any existing ``results.json`` or + ``checkpoint.json`` under ``output_folder`` is overwritten without + consulting it. Args: rankings_dir: Directory containing per-``(task, dataset_id)`` JSON artifacts produced by :meth:`BenchmarkConfig.save_rankings_artifact`. - tasks: Tasks to score. Non-ranking tasks are skipped with a warning. + tasks: Tasks to score. Must all be ``RankingTask`` instances; passing + any other task type raises ``ValueError``. output_folder: Where to write ``results.json``, ``config.yaml``, and ``checkpoint.json`` (same layout as :func:`evaluate`). metrics: Optional dict mapping task names to custom metrics lists. @@ -243,7 +248,13 @@ def evaluate_rankings( Raises ------ - RankingsArtifactMissing: An expected artifact is not on disk. + ValueError: ``tasks`` contains any non-``RankingTask`` entry. + FileNotFoundError: ``rankings_dir`` does not exist, or exists but + contains no ``*.json`` artifacts. + RankingsArtifactMissing: A specific ``(task, dataset_id)`` artifact + expected by the scoring loop is not on disk. Subclasses + ``FileNotFoundError``, so a single ``except FileNotFoundError`` + catches both this and the directory-level case above. RankingsArtifactInvalid: An artifact's header does not match the live dataset, or multiple model_names are present in ``rankings_dir``. @@ -265,7 +276,15 @@ def evaluate_rankings( f"rankings_dir does not exist or is not a directory: {rankings_dir}" ) - model_name = _peek_model_name(rankings_dir) + non_ranking = [task.name for task in tasks if not isinstance(task, RankingTask)] + if non_ranking: + raise ValueError( + f"evaluate_rankings only supports RankingTask, got non-ranking tasks: " + f"{non_ranking}. Filter your task list before calling." + ) + ranking_tasks: list[RankingTask] = [task for task in tasks if isinstance(task, RankingTask)] + + model_name, source_workrb_version = _peek_artifact_origin(rankings_dir) config = BenchmarkConfig( model_name=model_name, @@ -291,6 +310,7 @@ def evaluate_rankings( languages=_get_all_languages(tasks), resumed_from_checkpoint=False, language_aggregation_mode=language_aggregation_mode.value, + replayed_from_workrb_version=source_workrb_version, ), key_metrics_by_task_group=key_metrics_by_task_group, ) @@ -299,13 +319,7 @@ def evaluate_rankings( start_time_benchmark = time.time() logger.info(f"Replaying rankings from {rankings_dir} (model: {model_name})") - for task in tasks: - if not isinstance(task, RankingTask): - logger.warning( - "Skipping task '%s': evaluate_rankings only supports RankingTask.", task.name - ) - continue - + for task in ranking_tasks: scoped = dataset_ids_to_evaluate.get(task.name, []) if not scoped: continue @@ -392,12 +406,14 @@ def evaluate_rankings( return results -def _peek_model_name(rankings_dir: Path) -> str: - """Read ``model_name`` from the first artifact in ``rankings_dir``. +def _peek_artifact_origin(rankings_dir: Path) -> tuple[str, str | None]: + """Read ``(model_name, workrb_version)`` from the first artifact in ``rankings_dir``. All artifacts in one directory must agree on ``model_name``; later artifacts that disagree raise :class:`RankingsArtifactInvalid` when - they are loaded for scoring. + they are loaded for scoring. ``workrb_version`` is informational and + surfaced in ``BenchmarkMetadata.replayed_from_workrb_version`` so the + replay's ``results.json`` records which version produced the scores. """ json_files = sorted(rankings_dir.glob("*.json")) if not json_files: @@ -406,7 +422,7 @@ def _peek_model_name(rankings_dir: Path) -> str: model_name = header.get("model_name") if not model_name: raise RankingsArtifactInvalid(json_files[0], "header missing 'model_name'") - return model_name + return model_name, header.get("workrb_version") def get_tasks_overview( diff --git a/tests/test_save_rankings_artifacts.py b/tests/test_save_rankings_artifacts.py index 8a11d2f..b17bdc2 100644 --- a/tests/test_save_rankings_artifacts.py +++ b/tests/test_save_rankings_artifacts.py @@ -83,10 +83,11 @@ def _compute_rankings( query_input_type: ModelInputType, target_input_type: ModelInputType, ) -> torch.Tensor: - # 2 queries x 3 targets, with one explicit zero to exercise sparsity + # 2 queries x 3 targets, including 0.0 and a negative score to confirm + # both roundtrip exactly (no sparsity). return torch.tensor( [ - [0.1, 0.0, 0.3], + [0.1, 0.0, -0.3], [0.4, 0.5, 0.6], ], dtype=torch.float32, @@ -155,10 +156,11 @@ def test_evaluate_saves_rankings_artifact_with_new_schema(): scores = payload["scores"] assert set(scores.keys()) == {"0", "1"} - # Row 0 has a zero on target index 1 (sparse omission) - assert set(scores["0"].keys()) == {"0", "2"} + # Every (q, t) cell is stored, including 0.0 and negatives. + assert set(scores["0"].keys()) == {"0", "1", "2"} assert scores["0"]["0"] == pytest.approx(0.1) - assert scores["0"]["2"] == pytest.approx(0.3) + assert scores["0"]["1"] == pytest.approx(0.0) + assert scores["0"]["2"] == pytest.approx(-0.3) assert set(scores["1"].keys()) == {"0", "1", "2"} assert scores["1"]["0"] == pytest.approx(0.4) assert scores["1"]["1"] == pytest.approx(0.5) @@ -204,6 +206,12 @@ def test_evaluate_rankings_roundtrip_parity(): for key, value in write_metrics.items(): assert replay_metrics[key] == pytest.approx(value) + # Replay records which workrb version wrote the source artifacts; + # the original evaluate() run leaves the field unset. + assert write_results["metadata"].get("replayed_from_workrb_version") is None + artifact_header = json.loads(next(rankings_dir.glob("*.json")).read_text())["header"] + assert replay.metadata.replayed_from_workrb_version == artifact_header["workrb_version"] + def test_evaluate_rankings_missing_artifact_raises(): """A missing artifact file raises RankingsArtifactMissing.""" @@ -324,6 +332,75 @@ def test_evaluate_rankings_unknown_schema_version_raises(): ) +def test_save_rankings_rejects_non_finite_scores(): + """The writer refuses NaN/inf because JSON cannot represent them safely.""" + import numpy as np + + from workrb.config import BenchmarkConfig + + dataset = TinyRankingTask( + split=DatasetSplit.TEST, languages=[Language.EN] + ).datasets["en"] + config = BenchmarkConfig( + model_name="tiny", + output_folder=str(_fresh_dir("rankings_artifact_non_finite_write")), + ) + matrix = np.array([[0.1, float("nan"), 0.3], [0.4, 0.5, 0.6]], dtype=np.float32) + with pytest.raises(ValueError, match="non-finite"): + config.save_rankings_artifact( + task_name="Tiny Ranking Task", + dataset_id="en", + split="test", + dataset=dataset, + prediction_matrix=matrix, + ) + + +def test_evaluate_rankings_rejects_non_finite_scores_in_artifact(): + """A hand-edited NaN in the artifact is rejected at load time.""" + write_dir = _fresh_dir("rankings_artifact_non_finite_read") + rankings_dir = _run_and_get_rankings_dir(write_dir) + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + # Inject NaN by editing the parsed structure and re-emitting with + # allow_nan=True. json.load (used by the reader) accepts the non-standard + # NaN token, which is exactly what validate_header/load_rankings_artifact + # must catch. + payload = json.loads(files[0].read_text()) + payload["scores"]["0"]["0"] = float("nan") + files[0].write_text(json.dumps(payload, allow_nan=True)) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_non_finite_read_out") + with pytest.raises(RankingsArtifactInvalid, match="non-finite"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_rejects_out_of_bounds_target_index(): + """A target_index outside [0, num_targets) is rejected with IndexError.""" + write_dir = _fresh_dir("rankings_artifact_oob_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + payload = json.loads(files[0].read_text()) + # num_targets is 3 in TinyRankingTask, so 999 is out of bounds. + payload["scores"]["0"]["999"] = 0.42 + files[0].write_text(json.dumps(payload)) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_oob_read") + with pytest.raises(IndexError, match="target_index 999"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + def test_evaluate_rankings_version_mismatch_warns_but_proceeds(): """workrb_version mismatch only logs a warning; metrics still computed.""" write_dir = _fresh_dir("rankings_artifact_version_write")