diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 595b7e6..2b3e23b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -113,6 +113,7 @@ Make a pull request (PR) from your fork into the main branch of WorkRB, followin - [ ] Code follows project style guidelines - [ ] Documentation updated - [ ] No new warnings introduced + - [ ] If the rankings artifact schema changed: bumped SCHEMA_VERSION in workrb/rankings.py and updated SUPPORTED_SCHEMA_VERSIONS ``` ### 4. Review Process diff --git a/README.md b/README.md index 2a56472..b42b42b 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ results = workrb.evaluate( # Returns BenchmarkResults (Pydantic model) model, tasks, output_folder="results/my_model", + save_rankings=False, # Optional: store full per-target score arrays for ranking tasks ) print(results) # Benchmark/Per-task/Per-language metrics ``` @@ -210,6 +211,48 @@ results/my_model/ └── config.yaml # Final benchmark configuration dump ``` +If you pass `save_rankings=True` to `evaluate`, WorkRB also writes per-task, +per-dataset ranking score artifacts under a model-scoped subdirectory: + +``` +results/my_model/ +└── rankings/ + └── / + └── __.json +``` + +Each JSON file has two top-level keys: + +- `header`: identifies the artifact and pins it to a specific dataset + shape. Includes `schema_version`, `workrb_version`, `model_name`, + `task_name`, `dataset_id`, `split`, `num_queries`, `num_targets`, + plus four canary strings (`first_query_text`, `last_query_text`, + `first_target_text`, `last_target_text`) used to detect silent dataset + drift on replay. +- `scores`: a `{query_index: {target_index: score}}` mapping, with + keys as positional indices into the live dataset's `query_texts` / + `target_space` (stringified, as JSON requires string keys). Every + `(query, target)` cell is stored. Non-finite values (`NaN`, `+inf`, `-inf`) are + rejected at write time. + +Once written, you can recompute metrics without rerunning the model by +pointing `evaluate_rankings` at the model-scoped directory: + +```python +results = workrb.evaluate_rankings( + rankings_dir="results/my_model/rankings/my_model", + tasks=tasks, + output_folder="results/my_model_replay", +) +``` + +This is the recommended way to re-score after a metric definition has +changed (e.g. a new ranking metric is added in a workrb release): replay +is cheap, the model never runs again, and `validate_header` rejects any +artifact whose dataset shape no longer matches the live one. A +`workrb_version` mismatch only logs a warning; an unknown +`schema_version` is a hard reject. + To load & parse results from a run: ```python diff --git a/src/workrb/__init__.py b/src/workrb/__init__.py index 026ce95..c1b85ff 100644 --- a/src/workrb/__init__.py +++ b/src/workrb/__init__.py @@ -4,9 +4,15 @@ from workrb import data, metrics, models, tasks from workrb.logging import setup_logger +from workrb.rankings import RankingsArtifactInvalid, RankingsArtifactMissing from workrb.registry import list_available_tasks from workrb.results import load_results -from workrb.run import evaluate, evaluate_multiple_models, get_tasks_overview +from workrb.run import ( + evaluate, + evaluate_multiple_models, + evaluate_rankings, + get_tasks_overview, +) from workrb.types import ExecutionMode, LanguageAggregationMode # Configure 'workrb' logger to INFO level by default, by usage of package @@ -15,9 +21,12 @@ __all__ = [ "ExecutionMode", "LanguageAggregationMode", + "RankingsArtifactInvalid", + "RankingsArtifactMissing", "data", "evaluate", "evaluate_multiple_models", + "evaluate_rankings", "get_tasks_overview", "list_available_tasks", "load_results", diff --git a/src/workrb/config.py b/src/workrb/config.py index 14daf6a..219da80 100644 --- a/src/workrb/config.py +++ b/src/workrb/config.py @@ -7,21 +7,36 @@ import json import logging +import re import time from collections.abc import Sequence from dataclasses import asdict, dataclass, field from datetime import datetime, timezone +from importlib.metadata import PackageNotFoundError +from importlib.metadata import version as _pkg_version from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any +import numpy as np import yaml +from workrb.rankings import SCHEMA_VERSION, rankings_filename from workrb.results import BenchmarkResults from workrb.tasks.abstract import Task +if TYPE_CHECKING: + from workrb.tasks.abstract.ranking_base import RankingDataset + logger = logging.getLogger(__name__) +def _get_workrb_version() -> str: + try: + return _pkg_version("workrb") + except PackageNotFoundError: + return "unknown" + + @dataclass class BenchmarkConfig: """ @@ -134,6 +149,93 @@ def get_results_path(self) -> Path: """Get the path where final results should be saved.""" return self.get_output_path() / "results.json" + def get_rankings_dir(self) -> Path: + """Get the directory where per-dataset ranking artifacts are saved. + + Rankings are nested under a sanitized model-name directory so that + running multiple models into the same ``output_folder`` cannot clobber + each other's ranking files. + """ + safe_model_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", self.model_name).strip("_") + return self.get_output_path() / "rankings" / safe_model_name + + def get_task_rankings_path(self, task_name: str, dataset_id: str) -> Path: + """Get the output path for one task/dataset ranking artifact.""" + return self.get_rankings_dir() / rankings_filename(task_name, dataset_id) + + def save_rankings_artifact( + self, + task_name: str, + dataset_id: str, + split: str, + dataset: "RankingDataset", + prediction_matrix: np.ndarray, + ) -> Path: + """Save the prediction matrix for one ``(task, dataset_id)`` as a JSON artifact. + + Schema: + ``{"header": {...metadata...}, "scores": {q_idx: {t_idx: score}}}`` + Query and target keys are positional indices (stringified, since JSON + object keys must be strings); the dataset's row order at the pinned + workrb version is the implicit ID source. Every ``(q, t)`` cell is + stored. + + Non-finite scores (``NaN``, ``+inf``, ``-inf``) are rejected: standard + JSON cannot represent them, and silently coercing would corrupt the + artifact for downstream readers. + """ + workrb_version = _get_workrb_version() + + num_queries, num_targets = prediction_matrix.shape + if num_queries != len(dataset.query_texts): + raise ValueError( + f"prediction_matrix has {num_queries} rows but dataset has " + f"{len(dataset.query_texts)} queries" + ) + if num_targets != len(dataset.target_space): + raise ValueError( + f"prediction_matrix has {num_targets} cols but dataset has " + f"{len(dataset.target_space)} targets" + ) + if not np.all(np.isfinite(prediction_matrix)): + bad = np.argwhere(~np.isfinite(prediction_matrix)) + sample = bad[0] + raise ValueError( + f"prediction_matrix contains non-finite values (e.g. at " + f"query_index={int(sample[0])}, target_index={int(sample[1])}); " + "JSON cannot represent NaN/inf, refusing to write a corrupt artifact" + ) + + rankings_path = self.get_task_rankings_path(task_name=task_name, dataset_id=dataset_id) + rankings_path.parent.mkdir(parents=True, exist_ok=True) + + scores: dict[str, dict[str, float]] = {} + matrix = prediction_matrix.tolist() + for q_idx, row in enumerate(matrix): + scores[str(q_idx)] = {str(t_idx): float(score) for t_idx, score in enumerate(row)} + + payload = { + "header": { + "schema_version": SCHEMA_VERSION, + "workrb_version": workrb_version, + "model_name": self.model_name, + "task_name": task_name, + "dataset_id": dataset_id, + "split": split, + "num_queries": int(num_queries), + "num_targets": int(num_targets), + "first_query_text": dataset.query_texts[0], + "last_query_text": dataset.query_texts[-1], + "first_target_text": dataset.target_space[0], + "last_target_text": dataset.target_space[-1], + }, + "scores": scores, + } + with open(rankings_path, "w") as f: + json.dump(payload, f, indent=2, allow_nan=False) + logger.debug(f"Ranking artifact saved to {rankings_path}") + return rankings_path + def has_checkpoint(self) -> bool: """Check if a checkpoint exists.""" return self.get_checkpoint_path().exists() diff --git a/src/workrb/rankings.py b/src/workrb/rankings.py new file mode 100644 index 0000000..a61c165 --- /dev/null +++ b/src/workrb/rankings.py @@ -0,0 +1,231 @@ +"""Loading and validation utilities for ranking artifacts. + +A ranking artifact is the JSON file produced by +:meth:`BenchmarkConfig.save_rankings_artifact` for one ``(task, dataset_id)`` +pair. This module provides the read-side counterparts used by +:func:`workrb.evaluate_rankings` to replay metrics without re-running a model. +""" + +from __future__ import annotations + +import json +import logging +import math +import re +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from workrb.tasks.abstract.ranking_base import RankingDataset + +logger = logging.getLogger(__name__) + +# Current on-disk schema version for ranking artifacts. +# +# Bumped when the JSON shape itself changes (field renames, new required +# fields, restructuring of `scores`). Independent of ``workrb_version``: +# a workrb release that does not change the schema keeps the same value. +SCHEMA_VERSION = 1 + +# Schema versions this reader can parse. Artifacts with any other +# ``schema_version`` are hard-rejected by :func:`validate_header`. +SUPPORTED_SCHEMA_VERSIONS: frozenset[int] = frozenset({1}) + +# Pinned header field set per schema version. +# +# This is the source of truth that +# :func:`workrb.config.BenchmarkConfig.save_rankings_artifact` must agree +# with. A test (``test_writer_header_matches_pinned_schema``) compares +# freshly written headers against this mapping; mismatch means the writer +# changed without bumping :data:`SCHEMA_VERSION`. When evolving the schema: +# +# 1. Add a new entry ``SCHEMA_HEADER_FIELDS[N] = frozenset({...})``. +# 2. Bump :data:`SCHEMA_VERSION` to ``N``. +# 3. Decide whether to keep the old version in +# :data:`SUPPORTED_SCHEMA_VERSIONS` (back-compat) or drop it +# (force rewrite). +SCHEMA_HEADER_FIELDS: dict[int, frozenset[str]] = { + 1: frozenset( + { + "schema_version", + "workrb_version", + "model_name", + "task_name", + "dataset_id", + "split", + "num_queries", + "num_targets", + "first_query_text", + "last_query_text", + "first_target_text", + "last_target_text", + } + ), +} + + +def rankings_filename(task_name: str, dataset_id: str) -> str: + """Filename used for the artifact of one ``(task, dataset_id)`` pair.""" + safe_task = re.sub(r"[^A-Za-z0-9_.-]+", "_", task_name).strip("_") + safe_dataset = re.sub(r"[^A-Za-z0-9_.-]+", "_", dataset_id).strip("_") + return f"{safe_task}__{safe_dataset}.json" + + +class RankingsArtifactMissing(FileNotFoundError): + """Raised when a needed ranking artifact file is not on disk.""" + + def __init__(self, path: Path, task_name: str, dataset_id: str): + self.path = path + self.task_name = task_name + self.dataset_id = dataset_id + super().__init__( + f"No ranking artifact for task '{task_name}', dataset '{dataset_id}' at {path}" + ) + + +class RankingsArtifactInvalid(ValueError): + """Raised when a ranking artifact does not match the current dataset.""" + + def __init__(self, path: Path, reason: str): + self.path = path + self.reason = reason + super().__init__(f"Invalid ranking artifact at {path}: {reason}") + + +def load_rankings_artifact(path: Path) -> tuple[dict, dict[int, dict[int, float]]]: + """Load a ranking artifact and return ``(header, scores)`` with integer keys. + + JSON object keys are always strings, so both axes are cast back to ``int``. + Non-finite score values (``NaN``, ``+inf``, ``-inf``) are rejected: the + writer refuses to emit them, so their presence indicates a hand-edited or + corrupt artifact. + """ + with open(path) as f: + payload = json.load(f) + + if "header" not in payload or "scores" not in payload: + raise RankingsArtifactInvalid(path, "missing 'header' or 'scores' top-level keys") + + header = payload["header"] + raw_scores = payload["scores"] + scores: dict[int, dict[int, float]] = {} + for q, row in raw_scores.items(): + q_idx = int(q) + parsed_row: dict[int, float] = {} + for t, s in row.items(): + value = float(s) + if not math.isfinite(value): + raise RankingsArtifactInvalid( + path, + f"non-finite score at query_index={q_idx}, target_index={int(t)}: {value!r}", + ) + parsed_row[int(t)] = value + scores[q_idx] = parsed_row + return header, scores + + +def materialize_prediction_matrix( + scores: dict[int, dict[int, float]], + num_queries: int, + num_targets: int, +) -> np.ndarray: + """Build a dense ``(num_queries, num_targets)`` matrix from the score map. + + Raises ``IndexError`` if any ``(q_idx, t_idx)`` falls outside the declared + shape. Callers are expected to have validated the header against the live + dataset first; this is a defensive backstop against malformed artifacts. + """ + matrix = np.zeros((num_queries, num_targets), dtype=np.float32) + for q_idx, row in scores.items(): + if not 0 <= q_idx < num_queries: + raise IndexError( + f"query_index {q_idx} out of bounds for num_queries={num_queries}" + ) + for t_idx, score in row.items(): + if not 0 <= t_idx < num_targets: + raise IndexError( + f"target_index {t_idx} out of bounds for num_targets={num_targets} " + f"(at query_index={q_idx})" + ) + matrix[q_idx, t_idx] = score + return matrix + + +def validate_header( + header: dict, + dataset: RankingDataset, + task_name: str, + dataset_id: str, + split: str, + path: Path, + running_workrb_version: str, +) -> None: + """Validate an artifact header against the live dataset. + + Checks are run in three tiers: + + 1. ``schema_version`` must be in :data:`SUPPORTED_SCHEMA_VERSIONS`. This + gates whether the reader can parse the file at all; an unknown schema + version is a hard reject. + 2. Structural identity (task name, dataset id, split, sizes, canary + strings) must match the live dataset. Any mismatch is a hard reject. + 3. ``workrb_version`` is informational: a difference logs a warning but + does not block scoring. Cross-version replay is the intended use case + for :func:`workrb.evaluate_rankings` (metrics may have changed); the + real safety net is tier 2. + """ + schema_version = header.get("schema_version") + if schema_version not in SUPPORTED_SCHEMA_VERSIONS: + raise RankingsArtifactInvalid( + path, + f"unsupported schema_version {schema_version!r}; this reader " + f"supports {sorted(SUPPORTED_SCHEMA_VERSIONS)}", + ) + + if header.get("task_name") != task_name: + raise RankingsArtifactInvalid( + path, + f"header task_name '{header.get('task_name')}' does not match task '{task_name}'", + ) + if header.get("dataset_id") != dataset_id: + raise RankingsArtifactInvalid( + path, + f"header dataset_id '{header.get('dataset_id')}' does not match dataset '{dataset_id}'", + ) + if header.get("split") != split: + raise RankingsArtifactInvalid( + path, + f"header split '{header.get('split')}' does not match task split '{split}'", + ) + if header.get("num_queries") != len(dataset.query_texts): + raise RankingsArtifactInvalid( + path, + f"num_queries mismatch: header={header.get('num_queries')}, " + f"dataset={len(dataset.query_texts)}", + ) + if header.get("num_targets") != len(dataset.target_space): + raise RankingsArtifactInvalid( + path, + f"num_targets mismatch: header={header.get('num_targets')}, " + f"dataset={len(dataset.target_space)}", + ) + if header.get("first_query_text") != dataset.query_texts[0]: + raise RankingsArtifactInvalid(path, "first_query_text canary mismatch") + if header.get("last_query_text") != dataset.query_texts[-1]: + raise RankingsArtifactInvalid(path, "last_query_text canary mismatch") + if header.get("first_target_text") != dataset.target_space[0]: + raise RankingsArtifactInvalid(path, "first_target_text canary mismatch") + if header.get("last_target_text") != dataset.target_space[-1]: + raise RankingsArtifactInvalid(path, "last_target_text canary mismatch") + + header_version = header.get("workrb_version") + if header_version != running_workrb_version: + logger.warning( + "Ranking artifact %s was written by workrb %s, current is %s. " + "Sizes and canaries match, proceeding.", + path, + header_version, + running_workrb_version, + ) diff --git a/src/workrb/results.py b/src/workrb/results.py index fa29ce9..0152925 100644 --- a/src/workrb/results.py +++ b/src/workrb/results.py @@ -61,6 +61,9 @@ class BenchmarkMetadata(BaseModel): languages: list[str] resumed_from_checkpoint: bool = False language_aggregation_mode: str = LanguageAggregationMode.MONOLINGUAL_ONLY.value + replayed_from_workrb_version: str | None = None + """workrb version that wrote the ranking artifacts when this run was + produced by :func:`workrb.evaluate_rankings`; ``None`` for normal runs.""" class ResultTagString(BaseModel): diff --git a/src/workrb/run.py b/src/workrb/run.py index aefdeb1..770cc7e 100644 --- a/src/workrb/run.py +++ b/src/workrb/run.py @@ -9,12 +9,21 @@ import time from collections import Counter from collections.abc import Sequence +from pathlib import Path from typing import Any -from workrb.config import BenchmarkConfig +from workrb.config import BenchmarkConfig, _get_workrb_version from workrb.logging import setup_logger from workrb.metrics.reporting import format_results from workrb.models.base import ModelInterface +from workrb.rankings import ( + RankingsArtifactInvalid, + RankingsArtifactMissing, + load_rankings_artifact, + materialize_prediction_matrix, + rankings_filename, + validate_header, +) from workrb.results import ( BenchmarkMetadata, BenchmarkResults, @@ -23,6 +32,7 @@ TaskResults, ) from workrb.tasks.abstract.base import Task +from workrb.tasks.abstract.ranking_base import RankingTask from workrb.types import ExecutionMode, LanguageAggregationMode, get_language_grouping_key logger = logging.getLogger(__name__) @@ -36,6 +46,7 @@ def evaluate( metrics: dict[str, list[str]] | None = None, description: str = "", force_restart: bool = False, + save_rankings: bool = False, language_aggregation_mode: LanguageAggregationMode = LanguageAggregationMode.MONOLINGUAL_ONLY, execution_mode: ExecutionMode = ExecutionMode.LAZY, ) -> BenchmarkResults: @@ -49,6 +60,10 @@ def evaluate( metrics: Optional dict mapping task names to custom metrics lists description: Description for the benchmark run force_restart: If True, ignore checkpoints and restart from beginning + save_rankings: If True, save per-target ranking score arrays for each + ranking task dataset under + ``/rankings//`` as JSON artifacts. + Has no effect for non-ranking tasks. Defaults to False. language_aggregation_mode: How per-language results should be grouped when calling ``get_summary_metrics()`` on the returned results. When ``execution_mode`` is ``LAZY``, datasets that are @@ -79,10 +94,9 @@ def evaluate( ) # Determine which datasets are in scope for this run - if execution_mode == ExecutionMode.LAZY: - dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate(tasks, language_aggregation_mode) - else: - dataset_ids_to_evaluate = {task.name: list(task.dataset_ids) for task in tasks} + dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate( + tasks, language_aggregation_mode, execution_mode + ) pending_work = _filter_pending_work(pending_work, dataset_ids_to_evaluate) total_evaluations = sum(len(dids) for dids in dataset_ids_to_evaluate.values()) @@ -111,6 +125,7 @@ def evaluate( results=results, model=model, metrics=metrics, + save_rankings=save_rankings, total_evaluations=total_evaluations, ) if results.metadata.resumed_from_checkpoint: @@ -193,6 +208,223 @@ def evaluate_multiple_models( return all_results +def evaluate_rankings( + rankings_dir: str | Path, + tasks: Sequence[Task], + output_folder: str, + metrics: dict[str, list[str]] | None = None, + description: str = "", + language_aggregation_mode: LanguageAggregationMode = LanguageAggregationMode.MONOLINGUAL_ONLY, + execution_mode: ExecutionMode = ExecutionMode.LAZY, +) -> BenchmarkResults: + """Compute benchmark metrics by replaying saved ranking artifacts. + + No model is required. ``rankings_dir`` is expected to be the + ``/rankings//`` directory produced by a + previous ``evaluate(..., save_rankings=True)`` run. + + Always rescores from scratch: any existing ``results.json`` or + ``checkpoint.json`` under ``output_folder`` is overwritten without + consulting it. + + Args: + rankings_dir: Directory containing per-``(task, dataset_id)`` JSON + artifacts produced by :meth:`BenchmarkConfig.save_rankings_artifact`. + tasks: Tasks to score. Must all be ``RankingTask`` instances; passing + any other task type raises ``ValueError``. + output_folder: Where to write ``results.json``, ``config.yaml``, and + ``checkpoint.json`` (same layout as :func:`evaluate`). + metrics: Optional dict mapping task names to custom metrics lists. + description: Description for the benchmark run. + language_aggregation_mode: How per-language results should be grouped + in the returned ``BenchmarkResults``. + execution_mode: ``LAZY`` skips datasets incompatible with the chosen + aggregation mode, ``ALL`` scores everything for which an artifact + exists. + + Returns + ------- + ``BenchmarkResults`` populated from the artifacts. + + Raises + ------ + ValueError: ``tasks`` contains any non-``RankingTask`` entry. + FileNotFoundError: ``rankings_dir`` does not exist, or exists but + contains no ``*.json`` artifacts. + RankingsArtifactMissing: A specific ``(task, dataset_id)`` artifact + expected by the scoring loop is not on disk. Subclasses + ``FileNotFoundError``, so a single ``except FileNotFoundError`` + catches both this and the directory-level case above. + RankingsArtifactInvalid: An artifact's header does not match the live + dataset, or multiple model_names are present in ``rankings_dir``. + + Notes + ----- + Cross-version replay (an artifact produced by a different ``workrb_version``) + is supported and is in fact the main reason this entry point exists: it + lets you recompute metrics after metric definitions change without rerunning + the model. Such a version drift only logs a warning. The real safety net + is :func:`workrb.rankings.validate_header`'s structural and canary checks, + which catch any change to dataset construction (deduplication, + postprocessing, source data) and raise :class:`RankingsArtifactInvalid`. + An unknown ``schema_version`` is a hard reject regardless of workrb + version. + """ + rankings_dir = Path(rankings_dir) + if not rankings_dir.is_dir(): + raise FileNotFoundError( + f"rankings_dir does not exist or is not a directory: {rankings_dir}" + ) + + non_ranking = [task.name for task in tasks if not isinstance(task, RankingTask)] + if non_ranking: + raise ValueError( + f"evaluate_rankings only supports RankingTask, got non-ranking tasks: " + f"{non_ranking}. Filter your task list before calling." + ) + ranking_tasks: list[RankingTask] = [task for task in tasks if isinstance(task, RankingTask)] + + model_name, source_workrb_version = _peek_artifact_origin(rankings_dir) + + config = BenchmarkConfig( + model_name=model_name, + task_configs=[task.get_task_config() for task in tasks], + languages=sorted({lang.value for task in tasks for lang in task.languages}), + custom_metrics=metrics, + output_folder=output_folder, + description=description, + ) + + dataset_ids_to_evaluate = _get_dataset_ids_to_evaluate( + tasks, language_aggregation_mode, execution_mode + ) + + key_metrics_by_task_group = {task.task_group.value: task.default_metrics for task in tasks} + results = BenchmarkResults( + task_results={}, + metadata=BenchmarkMetadata( + model_name=model_name, + total_evaluation_time=0.0, + timestamp=time.time(), + num_tasks=len(tasks), + languages=_get_all_languages(tasks), + resumed_from_checkpoint=False, + language_aggregation_mode=language_aggregation_mode.value, + replayed_from_workrb_version=source_workrb_version, + ), + key_metrics_by_task_group=key_metrics_by_task_group, + ) + + running_version = _get_workrb_version() + start_time_benchmark = time.time() + logger.info(f"Replaying rankings from {rankings_dir} (model: {model_name})") + + for task in ranking_tasks: + scoped = dataset_ids_to_evaluate.get(task.name, []) + if not scoped: + continue + + if task.name not in results.task_results: + results.task_results[task.name] = TaskResults( + metadata=TaskResultMetadata( + task_group=task.task_group.value, + task_type=task.task_type.value, + label_type=task.label_type.value, + description=task.description, + split=task.split.value, + ), + datasetid_results={}, + ) + + logger.info(f"{'=' * 60}") + logger.info(f"Scoring task: {task.name}") + + for dataset_id in scoped: + path = rankings_dir / rankings_filename(task.name, dataset_id) + if not path.exists(): + raise RankingsArtifactMissing(path, task.name, dataset_id) + + header, scores = load_rankings_artifact(path) + if header.get("model_name") != model_name: + raise RankingsArtifactInvalid( + path, + f"model_name mismatch with sibling artifacts: header has " + f"'{header.get('model_name')}', expected '{model_name}'", + ) + dataset = task.datasets[dataset_id] + validate_header( + header=header, + dataset=dataset, + task_name=task.name, + dataset_id=dataset_id, + split=task.split.value, + path=path, + running_workrb_version=running_version, + ) + + start_time_eval = time.time() + prediction_matrix = materialize_prediction_matrix( + scores=scores, + num_queries=header["num_queries"], + num_targets=header["num_targets"], + ) + task_metrics = metrics.get(task.name) if metrics else None + metrics_dict = task.compute_metrics_from_prediction_matrix( + prediction_matrix=prediction_matrix, + dataset_id=dataset_id, + metrics=task_metrics, + ) + evaluation_time = time.time() - start_time_eval + + logger.info(f"* {dataset_id} ({task.get_size_oneliner(dataset_id)})") + _record_dataset_result( + results=results, + config=config, + task=task, + dataset_id=dataset_id, + metrics_dict=metrics_dict, + evaluation_time=evaluation_time, + ) + + results.metadata.total_evaluation_time = time.time() - start_time_benchmark + config.save_final_result_artifacts(results) + + logger.info(f"{'=' * 60}") + logger.info("✓ evaluate_rankings COMPLETE") + logger.info(f"Total time: {results.metadata.total_evaluation_time:.2f}s") + logger.info( + format_results( + results, + display_per_task=False, + display_per_task_group=False, + display_per_language=False, + display_overall=True, + language_aggregation_mode=language_aggregation_mode, + ) + ) + logger.info(f"{'=' * 60}") + return results + + +def _peek_artifact_origin(rankings_dir: Path) -> tuple[str, str | None]: + """Read ``(model_name, workrb_version)`` from the first artifact in ``rankings_dir``. + + All artifacts in one directory must agree on ``model_name``; later + artifacts that disagree raise :class:`RankingsArtifactInvalid` when + they are loaded for scoring. ``workrb_version`` is informational and + surfaced in ``BenchmarkMetadata.replayed_from_workrb_version`` so the + replay's ``results.json`` records which version produced the scores. + """ + json_files = sorted(rankings_dir.glob("*.json")) + if not json_files: + raise FileNotFoundError(f"No ranking artifacts (*.json) found in {rankings_dir}") + header, _ = load_rankings_artifact(json_files[0]) + model_name = header.get("model_name") + if not model_name: + raise RankingsArtifactInvalid(json_files[0], "header missing 'model_name'") + return model_name, header.get("workrb_version") + + def get_tasks_overview( tasks: Sequence[Task], dataset_ids_to_evaluate: dict[str, list[str]] | None = None, @@ -267,19 +499,25 @@ def _get_all_languages(tasks: Sequence[Task]) -> list[str]: def _get_dataset_ids_to_evaluate( tasks: Sequence[Task], language_aggregation_mode: LanguageAggregationMode, + execution_mode: ExecutionMode, ) -> dict[str, list[str]]: - """Compute which dataset IDs per task are compatible with the aggregation mode. + """Compute which dataset IDs per task are in scope for this run. - This is the single source of truth for the run's scope when - ``execution_mode`` is ``LAZY``. The returned dict drives the overview - display, total-evaluation count, and pending-work filtering. + Single source of truth for run scope. The returned dict drives the + overview display, total-evaluation count, and pending-work filtering. + Under ``ExecutionMode.ALL`` every dataset is in scope; under + ``ExecutionMode.LAZY`` datasets incompatible with the chosen + aggregation mode are dropped (and a warning is logged for each). Parameters ---------- tasks : Sequence[Task] All tasks configured for this benchmark run. language_aggregation_mode : LanguageAggregationMode - The aggregation mode to check compatibility against. + The aggregation mode to check compatibility against (only used + when ``execution_mode`` is ``LAZY``). + execution_mode : ExecutionMode + ``LAZY`` filters incompatible datasets; ``ALL`` keeps everything. Returns ------- @@ -288,6 +526,9 @@ def _get_dataset_ids_to_evaluate( Tasks whose datasets are all incompatible still appear as keys with an empty list. """ + if execution_mode == ExecutionMode.ALL: + return {task.name: list(task.dataset_ids) for task in tasks} + if language_aggregation_mode == LanguageAggregationMode.SKIP_LANGUAGE_AGGREGATION: return {task.name: list(task.dataset_ids) for task in tasks} @@ -429,6 +670,7 @@ def _run_pending_work( results: BenchmarkResults, model: ModelInterface, metrics: dict[str, list[str]] | None, + save_rankings: bool, total_evaluations: int, ) -> BenchmarkResults: """Run pending evaluations. @@ -439,6 +681,7 @@ def _run_pending_work( results: BenchmarkResults object to store results. model: ModelInterface object to evaluate. metrics: Dictionary of task names to their custom metrics. + save_rankings: If True, save full ranking score artifacts for ranking tasks. total_evaluations: Total number of compatible evaluations (for progress display). """ # Run pending evaluations @@ -476,31 +719,37 @@ def _run_pending_work( try: start_time_eval = time.time() - dataset_results: dict[str, float] = task.evaluate( - model=model, metrics=task_metrics, dataset_id=dataset_id - ) + if save_rankings and isinstance(task, RankingTask): + prediction_matrix = task.compute_prediction_matrix( + model=model, dataset_id=dataset_id + ) + dataset_results = task.compute_metrics_from_prediction_matrix( + prediction_matrix=prediction_matrix, + dataset_id=dataset_id, + metrics=task_metrics, + ) + rankings_path = config.save_rankings_artifact( + task_name=task.name, + dataset_id=dataset_id, + split=task.split.value, + dataset=task.datasets[dataset_id], + prediction_matrix=prediction_matrix, + ) + logger.info(f"\tSaved ranking scores to: {rankings_path}") + else: + dataset_results: dict[str, float] = task.evaluate( + model=model, metrics=task_metrics, dataset_id=dataset_id + ) evaluation_time = time.time() - start_time_eval - # Store results - dataset_languages = task.get_dataset_languages(dataset_id) - results.task_results[task.name].datasetid_results[dataset_id] = MetricsResult( - evaluation_time=evaluation_time, + _record_dataset_result( + results=results, + config=config, + task=task, + dataset_id=dataset_id, metrics_dict=dataset_results, - input_languages=sorted( - lang.value for lang in dataset_languages.input_languages - ), - output_languages=sorted( - lang.value for lang in dataset_languages.output_languages - ), + evaluation_time=evaluation_time, ) - - # Save incremental results to checkpoint - if config: - config.save_results_checkpoint(results) - - # Show key metrics - key_metric = task.default_metrics[0] - logger.info(f"\t{key_metric}: {dataset_results[key_metric]:.3f}") run_idx += 1 except Exception as e: logger.error(f"Error: {e}") @@ -508,3 +757,31 @@ def _run_pending_work( logger.info(f"Completed {run_idx} / {total_evaluations} evaluations. ") return results + + +def _record_dataset_result( + results: BenchmarkResults, + config: BenchmarkConfig, + task: Task, + dataset_id: str, + metrics_dict: dict[str, float], + evaluation_time: float, +) -> None: + """Attach a per-dataset metrics result, checkpoint, and log the key metric. + + Shared between :func:`evaluate` and :func:`evaluate_rankings` so both paths + produce identical ``BenchmarkResults`` bookkeeping. + """ + dataset_languages = task.get_dataset_languages(dataset_id) + results.task_results[task.name].datasetid_results[dataset_id] = MetricsResult( + evaluation_time=evaluation_time, + metrics_dict=metrics_dict, + input_languages=sorted(lang.value for lang in dataset_languages.input_languages), + output_languages=sorted(lang.value for lang in dataset_languages.output_languages), + ) + + if config: + config.save_results_checkpoint(results) + + key_metric = task.default_metrics[0] + logger.info(f"\t{key_metric}: {metrics_dict[key_metric]:.3f}") diff --git a/src/workrb/tasks/abstract/ranking_base.py b/src/workrb/tasks/abstract/ranking_base.py index 452ffbf..00a5842 100644 --- a/src/workrb/tasks/abstract/ranking_base.py +++ b/src/workrb/tasks/abstract/ranking_base.py @@ -8,6 +8,7 @@ from enum import Enum from typing import TYPE_CHECKING +import numpy as np import torch from workrb.metrics.ranking import calculate_ranking_metrics @@ -431,36 +432,48 @@ def evaluate( dict[str, float] Dictionary containing metric scores and evaluation metadata. """ - if metrics is None: - metrics = self.default_metrics + prediction_matrix = self.compute_prediction_matrix(model=model, dataset_id=dataset_id) + return self.compute_metrics_from_prediction_matrix( + prediction_matrix=prediction_matrix, + dataset_id=dataset_id, + metrics=metrics, + ) - # Retrieve dataset by ID + def compute_prediction_matrix( + self, + model: ModelInterface, + dataset_id: str = "en", + ) -> np.ndarray: + """Compute the ranking score matrix for a dataset.""" dataset = self.datasets[dataset_id] - queries = dataset.query_texts - targets = dataset.target_space - labels = dataset.target_indices - - # Get model predictions (similarity matrix) prediction_matrix = model.compute_rankings( - queries=queries, - targets=targets, + queries=dataset.query_texts, + targets=dataset.target_space, query_input_type=self.query_input_type, target_input_type=self.target_input_type, ) - - # Convert to numpy if needed if isinstance(prediction_matrix, torch.Tensor): prediction_matrix = prediction_matrix.cpu().float().numpy() + return prediction_matrix + def compute_metrics_from_prediction_matrix( + self, + prediction_matrix: np.ndarray, + dataset_id: str = "en", + metrics: list[str] | None = None, + ) -> dict[str, float]: + """Compute ranking metrics from a precomputed prediction matrix.""" + if metrics is None: + metrics = self.default_metrics + dataset = self.datasets[dataset_id] # Calculate metrics. When the dataset provides graded relevance, binary # metrics consume only positives with relevance >= binary_relevance_threshold; # nDCG still sees the full graded label list. - metric_results = calculate_ranking_metrics( + return calculate_ranking_metrics( prediction_matrix=prediction_matrix, - pos_label_idxs=labels, + pos_label_idxs=dataset.target_indices, metrics=metrics, pos_label_relevance=dataset.target_relevance, binary_relevance_threshold=self.binary_relevance_threshold, ) - - return metric_results + diff --git a/tests/test_multi_dataset_task.py b/tests/test_multi_dataset_task.py index 07b2df1..d09a525 100644 --- a/tests/test_multi_dataset_task.py +++ b/tests/test_multi_dataset_task.py @@ -22,6 +22,7 @@ from workrb.tasks import ESCOJob2SkillRanking, RankingDataset from workrb.types import ( DatasetLanguages, + ExecutionMode, Language, LanguageAggregationMode, get_language_grouping_key, @@ -476,7 +477,9 @@ def test_monolingual_only_skips_crosslingual(self): ), }, ) - result = _get_dataset_ids_to_evaluate([task], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [task], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {"task1": ["en"]} def test_group_input_keeps_crosslingual_singleton_input(self): @@ -495,7 +498,7 @@ def test_group_input_keeps_crosslingual_singleton_input(self): }, ) result = _get_dataset_ids_to_evaluate( - [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES + [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES, ExecutionMode.LAZY ) assert result == {"task1": ["en_de"]} @@ -531,7 +534,9 @@ def test_monolingual_only_mixed_task_keeps_only_monolingual(self): ), }, ) - result = _get_dataset_ids_to_evaluate([task], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [task], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {"melo_task": ["en", "de"]} def test_group_input_mixed_task_keeps_singleton_input(self): @@ -562,13 +567,15 @@ def test_group_input_mixed_task_keeps_singleton_input(self): }, ) result = _get_dataset_ids_to_evaluate( - [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES + [task], LanguageAggregationMode.CROSSLINGUAL_GROUP_INPUT_LANGUAGES, ExecutionMode.LAZY ) assert result == {"melo_task": ["en", "en_de", "fr_de"]} def test_no_tasks(self): """Empty task list returns empty dict.""" - result = _get_dataset_ids_to_evaluate([], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {} def test_all_datasets_incompatible(self): @@ -586,7 +593,9 @@ def test_all_datasets_incompatible(self): ), }, ) - result = _get_dataset_ids_to_evaluate([task], LanguageAggregationMode.MONOLINGUAL_ONLY) + result = _get_dataset_ids_to_evaluate( + [task], LanguageAggregationMode.MONOLINGUAL_ONLY, ExecutionMode.LAZY + ) assert result == {"task1": []} def test_skip_language_aggregation_keeps_all(self): @@ -609,7 +618,7 @@ def test_skip_language_aggregation_keeps_all(self): }, ) result = _get_dataset_ids_to_evaluate( - [task], LanguageAggregationMode.SKIP_LANGUAGE_AGGREGATION + [task], LanguageAggregationMode.SKIP_LANGUAGE_AGGREGATION, ExecutionMode.LAZY ) assert result == {"task1": ["en", "en_de", "multi"]} diff --git a/tests/test_save_rankings_artifacts.py b/tests/test_save_rankings_artifacts.py new file mode 100644 index 0000000..b17bdc2 --- /dev/null +++ b/tests/test_save_rankings_artifacts.py @@ -0,0 +1,431 @@ +"""Tests for ranking artifact persistence and replay.""" + +import json +import logging +import shutil +from pathlib import Path + +import pytest +import torch + +import workrb +from workrb.models.base import ModelInterface +from workrb.rankings import ( + SCHEMA_HEADER_FIELDS, + SCHEMA_VERSION, + RankingsArtifactInvalid, + RankingsArtifactMissing, +) +from workrb.tasks.abstract.base import DatasetSplit, LabelType, Language +from workrb.tasks.abstract.ranking_base import RankingDataset, RankingTask, RankingTaskGroup +from workrb.types import ModelInputType + + +class TinyRankingTask(RankingTask): + """Minimal ranking task used to test ranking artifact persistence.""" + + @property + def name(self) -> str: + return "Tiny Ranking Task" + + @property + def description(self) -> str: + return "Tiny in-memory ranking task for tests." + + @property + def supported_query_languages(self) -> list[Language]: + return [Language.EN] + + @property + def supported_target_languages(self) -> list[Language]: + return [Language.EN] + + @property + def task_group(self) -> RankingTaskGroup: + return RankingTaskGroup.SKILL_EXTRACTION + + @property + def label_type(self) -> LabelType: + return LabelType.MULTI_LABEL + + @property + def query_input_type(self) -> ModelInputType: + return ModelInputType.SKILL_SENTENCE + + @property + def target_input_type(self) -> ModelInputType: + return ModelInputType.SKILL_NAME + + def load_dataset(self, dataset_id: str, split: DatasetSplit) -> RankingDataset: + return RankingDataset( + query_texts=["query one", "query two"], + target_indices=[[0], [1]], + target_space=["target_a", "target_b", "target_c"], + dataset_id=dataset_id, + ) + + +class TinyDeterministicModel(ModelInterface): + """Deterministic model with fixed ranking scores for tests.""" + + @property + def name(self) -> str: + return "tiny-deterministic-model" + + @property + def description(self) -> str: + return "Tiny deterministic model for ranking artifact tests." + + def _compute_rankings( + self, + queries: list[str], + targets: list[str], + query_input_type: ModelInputType, + target_input_type: ModelInputType, + ) -> torch.Tensor: + # 2 queries x 3 targets, including 0.0 and a negative score to confirm + # both roundtrip exactly (no sparsity). + return torch.tensor( + [ + [0.1, 0.0, -0.3], + [0.4, 0.5, 0.6], + ], + dtype=torch.float32, + ) + + def _compute_classification( + self, + texts: list[str], + targets: list[str], + input_type: ModelInputType, + target_input_type: ModelInputType | None = None, + ) -> torch.Tensor: + return torch.zeros((len(texts), len(targets))) + + @property + def classification_label_space(self) -> list[str] | None: + return None + + +def _fresh_dir(name: str) -> Path: + path = Path("tmp") / name + if path.exists(): + shutil.rmtree(path, ignore_errors=True) + return path + + +def _run_and_get_rankings_dir(output_folder: Path) -> Path: + model = TinyDeterministicModel() + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + workrb.evaluate( + model=model, + tasks=tasks, + output_folder=str(output_folder), + force_restart=True, + save_rankings=True, + ) + return output_folder / "rankings" / model.name + + +def test_evaluate_saves_rankings_artifact_with_new_schema(): + """evaluate(save_rankings=True) writes one JSON per ranking dataset with header + sparse int-keyed scores.""" + output_folder = _fresh_dir("rankings_artifact_test_enabled") + rankings_dir = _run_and_get_rankings_dir(output_folder) + + ranking_files = list(rankings_dir.glob("*.json")) + assert len(ranking_files) == 1 + + with open(ranking_files[0]) as f: + payload = json.load(f) + + assert set(payload.keys()) == {"header", "scores"} + + header = payload["header"] + assert header["schema_version"] == 1 + assert header["model_name"] == "tiny-deterministic-model" + assert header["task_name"] == "Tiny Ranking Task" + assert header["dataset_id"] == "en" + assert header["split"] == "test" + assert header["num_queries"] == 2 + assert header["num_targets"] == 3 + assert header["first_query_text"] == "query one" + assert header["last_query_text"] == "query two" + assert header["first_target_text"] == "target_a" + assert header["last_target_text"] == "target_c" + assert isinstance(header["workrb_version"], str) and header["workrb_version"] + + scores = payload["scores"] + assert set(scores.keys()) == {"0", "1"} + # Every (q, t) cell is stored, including 0.0 and negatives. + assert set(scores["0"].keys()) == {"0", "1", "2"} + assert scores["0"]["0"] == pytest.approx(0.1) + assert scores["0"]["1"] == pytest.approx(0.0) + assert scores["0"]["2"] == pytest.approx(-0.3) + assert set(scores["1"].keys()) == {"0", "1", "2"} + assert scores["1"]["0"] == pytest.approx(0.4) + assert scores["1"]["1"] == pytest.approx(0.5) + assert scores["1"]["2"] == pytest.approx(0.6) + + +def test_evaluate_does_not_save_rankings_artifact_by_default(): + """evaluate() without save_rankings does not create a rankings/ directory.""" + output_folder = _fresh_dir("rankings_artifact_test_disabled") + model = TinyDeterministicModel() + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + workrb.evaluate( + model=model, + tasks=tasks, + output_folder=str(output_folder), + force_restart=True, + ) + + assert not (output_folder / "rankings").exists() + + +def test_evaluate_rankings_roundtrip_parity(): + """evaluate_rankings on saved artifacts reproduces metrics from evaluate(save_rankings=True).""" + write_dir = _fresh_dir("rankings_artifact_roundtrip_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + + with open(write_dir / "results.json") as f: + write_results = json.load(f) + write_metrics = write_results["task_results"]["Tiny Ranking Task"]["datasetid_results"]["en"][ + "metrics_dict" + ] + + read_output = _fresh_dir("rankings_artifact_roundtrip_read") + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + replay = workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + replay_metrics = replay.task_results["Tiny Ranking Task"].datasetid_results["en"].metrics_dict + assert set(replay_metrics.keys()) == set(write_metrics.keys()) + for key, value in write_metrics.items(): + assert replay_metrics[key] == pytest.approx(value) + + # Replay records which workrb version wrote the source artifacts; + # the original evaluate() run leaves the field unset. + assert write_results["metadata"].get("replayed_from_workrb_version") is None + artifact_header = json.loads(next(rankings_dir.glob("*.json")).read_text())["header"] + assert replay.metadata.replayed_from_workrb_version == artifact_header["workrb_version"] + + +def test_evaluate_rankings_missing_artifact_raises(): + """A missing artifact file raises RankingsArtifactMissing.""" + write_dir = _fresh_dir("rankings_artifact_missing_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + + # Remove the only artifact, then add a stub so the directory peek succeeds + # but the per-task lookup fails. + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + stub_payload = json.loads(files[0].read_text()) + files[0].unlink() + decoy = rankings_dir / "Some_Other_Task__xx.json" + decoy.write_text(json.dumps(stub_payload)) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_missing_read") + with pytest.raises(RankingsArtifactMissing): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def _hand_edit_header(rankings_dir: Path, **header_overrides) -> Path: + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + path = files[0] + payload = json.loads(path.read_text()) + payload["header"].update(header_overrides) + path.write_text(json.dumps(payload)) + return path + + +def test_evaluate_rankings_size_mismatch_raises(): + write_dir = _fresh_dir("rankings_artifact_size_mismatch_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, num_queries=99) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_size_mismatch_read") + with pytest.raises(RankingsArtifactInvalid, match="num_queries"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_canary_mismatch_raises(): + write_dir = _fresh_dir("rankings_artifact_canary_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, first_query_text="this is not the original query") + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_canary_read") + with pytest.raises(RankingsArtifactInvalid, match="first_query_text"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_split_mismatch_raises(): + """An artifact written for one split cannot be replayed against another.""" + write_dir = _fresh_dir("rankings_artifact_split_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, split="val") + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_split_read") + with pytest.raises(RankingsArtifactInvalid, match="split"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_writer_header_matches_pinned_schema(): + """The writer must emit exactly the header fields pinned in SCHEMA_HEADER_FIELDS. + + This is the gate against silently changing the on-disk schema. If you add, + remove, or rename a header field, bump SCHEMA_VERSION and add a new entry + to SCHEMA_HEADER_FIELDS rather than editing the current one in place. + """ + write_dir = _fresh_dir("rankings_artifact_pinned_schema") + rankings_dir = _run_and_get_rankings_dir(write_dir) + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + payload = json.loads(files[0].read_text()) + + written_fields = frozenset(payload["header"].keys()) + pinned_fields = SCHEMA_HEADER_FIELDS[SCHEMA_VERSION] + assert written_fields == pinned_fields, ( + f"writer header drift from SCHEMA_VERSION={SCHEMA_VERSION}: " + f"added={sorted(written_fields - pinned_fields)}, " + f"removed={sorted(pinned_fields - written_fields)}. " + "Bump SCHEMA_VERSION and add a new entry in SCHEMA_HEADER_FIELDS." + ) + + +def test_evaluate_rankings_unknown_schema_version_raises(): + """An unknown schema_version is a hard reject (independent of workrb_version).""" + write_dir = _fresh_dir("rankings_artifact_schema_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, schema_version=999) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_schema_read") + with pytest.raises(RankingsArtifactInvalid, match="schema_version"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_save_rankings_rejects_non_finite_scores(): + """The writer refuses NaN/inf because JSON cannot represent them safely.""" + import numpy as np + + from workrb.config import BenchmarkConfig + + dataset = TinyRankingTask( + split=DatasetSplit.TEST, languages=[Language.EN] + ).datasets["en"] + config = BenchmarkConfig( + model_name="tiny", + output_folder=str(_fresh_dir("rankings_artifact_non_finite_write")), + ) + matrix = np.array([[0.1, float("nan"), 0.3], [0.4, 0.5, 0.6]], dtype=np.float32) + with pytest.raises(ValueError, match="non-finite"): + config.save_rankings_artifact( + task_name="Tiny Ranking Task", + dataset_id="en", + split="test", + dataset=dataset, + prediction_matrix=matrix, + ) + + +def test_evaluate_rankings_rejects_non_finite_scores_in_artifact(): + """A hand-edited NaN in the artifact is rejected at load time.""" + write_dir = _fresh_dir("rankings_artifact_non_finite_read") + rankings_dir = _run_and_get_rankings_dir(write_dir) + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + # Inject NaN by editing the parsed structure and re-emitting with + # allow_nan=True. json.load (used by the reader) accepts the non-standard + # NaN token, which is exactly what validate_header/load_rankings_artifact + # must catch. + payload = json.loads(files[0].read_text()) + payload["scores"]["0"]["0"] = float("nan") + files[0].write_text(json.dumps(payload, allow_nan=True)) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_non_finite_read_out") + with pytest.raises(RankingsArtifactInvalid, match="non-finite"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_rejects_out_of_bounds_target_index(): + """A target_index outside [0, num_targets) is rejected with IndexError.""" + write_dir = _fresh_dir("rankings_artifact_oob_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + files = list(rankings_dir.glob("*.json")) + assert len(files) == 1 + payload = json.loads(files[0].read_text()) + # num_targets is 3 in TinyRankingTask, so 999 is out of bounds. + payload["scores"]["0"]["999"] = 0.42 + files[0].write_text(json.dumps(payload)) + + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_oob_read") + with pytest.raises(IndexError, match="target_index 999"): + workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + + +def test_evaluate_rankings_version_mismatch_warns_but_proceeds(): + """workrb_version mismatch only logs a warning; metrics still computed.""" + write_dir = _fresh_dir("rankings_artifact_version_write") + rankings_dir = _run_and_get_rankings_dir(write_dir) + _hand_edit_header(rankings_dir, workrb_version="0.0.0-test-fixture") + + records: list[logging.LogRecord] = [] + + class _Capture(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + records.append(record) + + capture = _Capture(level=logging.WARNING) + rankings_logger = logging.getLogger("workrb.rankings") + rankings_logger.addHandler(capture) + try: + tasks = [TinyRankingTask(split=DatasetSplit.TEST, languages=[Language.EN])] + read_output = _fresh_dir("rankings_artifact_version_read") + replay = workrb.evaluate_rankings( + rankings_dir=rankings_dir, + tasks=tasks, + output_folder=str(read_output), + ) + finally: + rankings_logger.removeHandler(capture) + + assert any("0.0.0-test-fixture" in r.getMessage() for r in records) + assert "Tiny Ranking Task" in replay.task_results