diff --git a/toolchain/bootstrap/lint.sh b/toolchain/bootstrap/lint.sh index 6ff9ca88eb..c22b772f86 100644 --- a/toolchain/bootstrap/lint.sh +++ b/toolchain/bootstrap/lint.sh @@ -37,6 +37,7 @@ if [ "$RUN_TESTS" = true ]; then python3 -m unittest mfc.params_tests.test_registry mfc.params_tests.test_definitions mfc.params_tests.test_validate mfc.params_tests.test_integration -v python3 -m unittest mfc.cli.test_cli -v python3 -m unittest mfc.viz.test_viz -v + python3 -m unittest mfc.run.test_archive -v cd - > /dev/null fi diff --git a/toolchain/mfc/cli/commands.py b/toolchain/mfc/cli/commands.py index e98003aa74..207ab08dbf 100644 --- a/toolchain/mfc/cli/commands.py +++ b/toolchain/mfc/cli/commands.py @@ -302,6 +302,21 @@ action=ArgAction.STORE_TRUE, default=False, ), + Argument( + name="archive", + help="(Interactive) Archive case inputs and outputs to PATH after the run completes.", + default=None, + metavar="PATH", + completion=Completion(type=CompletionType.DIRECTORIES), + ), + Argument( + name="archive-format", + help="(Interactive) Archive container format: dir (default), tar, or tar.zst.", + choices=["dir", "tar", "tar.zst"], + default="dir", + dest="archive_format", + completion=Completion(type=CompletionType.CHOICES, choices=["dir", "tar", "tar.zst"]), + ), # Profiler arguments with REMAINDER Argument( name="ncu", @@ -333,6 +348,8 @@ Example("./mfc.sh run case.py -n 4", "Run with 4 MPI ranks"), Example("./mfc.sh run case.py --case-optimization -j 8", "10x faster with case optimization!"), Example("./mfc.sh run case.py -e batch -N 2 -n 4", "Submit batch job: 2 nodes, 4 ranks/node"), + Example("./mfc.sh run case.py --archive /mnt/nas/mfc-runs", "Archive run into /mnt/nas/mfc-runs/-/"), + Example("./mfc.sh run case.py --archive /mnt/nas/mfc-runs --archive-format tar.zst", "Archive as a compressed tarball"), ], key_options=[ ("--case-optimization", "Hard-code params for 10x speedup!"), @@ -341,6 +358,8 @@ ("-e, --engine", "interactive or batch"), ("-a, --account", "Account to charge (batch)"), ("-w, --walltime", "Wall time limit (batch)"), + ("--archive PATH", "Archive inputs+outputs after interactive run"), + ("--archive-format FMT", "Archive format: dir, tar, tar.zst"), ], ) diff --git a/toolchain/mfc/run/archive.py b/toolchain/mfc/run/archive.py new file mode 100644 index 0000000000..0c3848060a --- /dev/null +++ b/toolchain/mfc/run/archive.py @@ -0,0 +1,257 @@ +import dataclasses +import datetime +import os +import shutil +import sys +import tarfile +import tempfile + +from ..common import MFCException, does_command_exist, file_dump_yaml, generate_git_tagline, system +from ..printer import cons +from ..state import ARG, CFG +from . import input + +ARTIFACT_FILENAMES = [ + "equations.dat", + "run_time.inf", + "time_data.dat", + "io_time_data.dat", + "fort.1", + "pre_time_data.dat", +] + +ARTIFACT_DIRNAMES = [ + "D", + "p_all", + "restart_data", + "silo_hdf5", +] + + +def __collect_sources(case: input.MFCInputFile, targets) -> list: + dirpath = case.dirpath + name = ARG("name") + sources = [] + + if os.path.isfile(case.filename): + sources.append(case.filename) + + for target in targets: + inp = os.path.join(dirpath, f"{target.name}.inp") + if os.path.isfile(inp): + sources.append(inp) + + for candidate in ARTIFACT_FILENAMES: + candidate_path = os.path.join(dirpath, candidate) + if os.path.isfile(candidate_path): + sources.append(candidate_path) + + for script_ext in ("sh", "bat"): + script = os.path.join(dirpath, f"{name}.{script_ext}") + if os.path.isfile(script): + sources.append(script) + + out_file = os.path.join(dirpath, f"{name}.out") + if os.path.isfile(out_file): + sources.append(out_file) + + summary = ARG("output_summary") + if summary is not None and os.path.isfile(summary): + summary_abs = os.path.abspath(summary) + dirpath_abs = os.path.abspath(dirpath) + if os.path.commonpath([summary_abs, dirpath_abs]) == dirpath_abs: + sources.append(summary_abs) + else: + cons.print(f"[yellow]Archive: skipping output_summary outside case dir: {summary_abs}[/yellow]") + + for dirname in ARTIFACT_DIRNAMES: + candidate_dir = os.path.join(dirpath, dirname) + if os.path.isdir(candidate_dir): + sources.append(candidate_dir) + + return sources + + +def __build_manifest(case: input.MFCInputFile, targets, sources: list, archive_path: str, archive_format: str) -> dict: + dirpath = case.dirpath + relative_sources = [] + for src in sources: + try: + rel = os.path.relpath(src, dirpath) + except ValueError: + rel = src + relative_sources.append(rel) + + return { + "timestamp": datetime.datetime.now().astimezone().isoformat(), + "source_case": os.path.abspath(case.filename), + "source_dir": os.path.abspath(dirpath), + "invocation": sys.argv[1:], + "git": generate_git_tagline(), + "targets": [t.name for t in targets], + "archive_format": archive_format, + "archive_path": archive_path, + "build_lock": dataclasses.asdict(CFG()), + "contents": sorted(relative_sources), + } + + +def __copy_dir(sources: list, case: input.MFCInputFile, dest: str) -> None: + os.makedirs(dest, exist_ok=True) + dirpath = case.dirpath + + for src in sources: + try: + rel = os.path.relpath(src, dirpath) + except ValueError: + rel = os.path.basename(src) + + target_path = os.path.join(dest, rel) + os.makedirs(os.path.dirname(target_path), exist_ok=True) + + if os.path.isdir(src): + shutil.copytree(src, target_path, dirs_exist_ok=True, symlinks=True) + else: + shutil.copy2(src, target_path) + + +def __write_tar(sources: list, case: input.MFCInputFile, dest: str, compressed: bool, manifest_file: str) -> None: + dirpath = case.dirpath + arcroot = os.path.basename(dest).removesuffix(".tar.zst").removesuffix(".tar") + + def rel_for(path: str) -> str: + try: + return os.path.relpath(path, dirpath) + except ValueError: + return os.path.basename(path) + + if compressed: + if not does_command_exist("tar"): + raise MFCException("Archive: 'tar' binary not found; required for --archive-format tar.zst.") + + with tempfile.TemporaryDirectory() as staging: + staging_root = os.path.join(staging, arcroot) + os.makedirs(staging_root, exist_ok=True) + + for src in sources: + rel = rel_for(src) + if rel.startswith(".."): + raise MFCException(f"Archive: refusing to include source outside case dir: {src}") + target = os.path.join(staging_root, rel) + os.makedirs(os.path.dirname(target), exist_ok=True) + if os.path.isdir(src): + shutil.copytree(src, target, symlinks=True) + else: + shutil.copy2(src, target) + + shutil.copy2(manifest_file, os.path.join(staging_root, "manifest.yaml")) + + result = system( + ["tar", "--zstd", "-cf", dest, "-C", staging, arcroot], + print_cmd=False, + ) + if result.returncode != 0: + raise MFCException(f"Archive: 'tar --zstd' failed with exit code {result.returncode}. Ensure GNU tar >= 1.31.") + return + + with tarfile.open(dest, "w") as tf: + for src in sources: + rel = rel_for(src) + if rel.startswith(".."): + raise MFCException(f"Archive: refusing to include source outside case dir: {src}") + tf.add(src, arcname=os.path.join(arcroot, rel)) + tf.add(manifest_file, arcname=os.path.join(arcroot, "manifest.yaml")) + + +@dataclasses.dataclass +class ArchivePlan: + dest: str + archive_format: str + stem: str + + +def plan_archive(case): + """Validate --archive settings and reserve a unique destination path. + + Runs before the simulation executes so bad paths, bad formats, or + unwritable roots fail fast. Returns None if --archive is unset. + + The stem is `-` where `case_dir_name` is + the basename of the directory holding case.py. This way archives + from different cases dropped in the same archive root are + self-identifying (e.g. 1D_sodshocktube-20260424-123045) rather + than all sharing the generic --name default. + + If the computed path already exists, appends "-2", "-3", ... to + the stem until a free name is found, so two runs starting in the + same second never collide. + """ + archive_root = ARG("archive") + if archive_root is None: + return None + + archive_format = ARG("archive_format") or "dir" + suffix_map = {"dir": "", "tar": ".tar", "tar.zst": ".tar.zst"} + if archive_format not in suffix_map: + raise MFCException(f"Archive: unsupported format '{archive_format}'. Must be one of: {', '.join(suffix_map)}.") + suffix = suffix_map[archive_format] + + # Derive the stem from the case's parent directory name so archives + # from different cases are distinguishable. Fall back to "case" if + # the case somehow lives at the filesystem root. + case_dir_name = os.path.basename(os.path.abspath(case.dirpath).rstrip(os.sep)) or "case" + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + base_stem = f"{case_dir_name}-{timestamp}" + + archive_root = os.path.abspath(os.path.expanduser(archive_root)) + try: + os.makedirs(archive_root, exist_ok=True) + except OSError as e: + raise MFCException(f"Archive: cannot create archive root {archive_root}: {e}") from e + + stem = base_stem + dest = os.path.join(archive_root, stem + suffix) + counter = 2 + while os.path.exists(dest): + stem = f"{base_stem}-{counter}" + dest = os.path.join(archive_root, stem + suffix) + counter += 1 + + if stem != base_stem: + cons.print(f"[yellow]Archive: destination existed, using {os.path.basename(dest)} to avoid collision.[/yellow]") + + return ArchivePlan(dest=dest, archive_format=archive_format, stem=stem) + + +def archive(plan: "ArchivePlan", case: input.MFCInputFile, targets) -> None: + """Write the planned archive. Caller must have obtained `plan` via plan_archive().""" + dest = plan.dest + archive_format = plan.archive_format + + sources = __collect_sources(case, targets) + if not sources: + cons.print("[yellow]Archive: no artifacts found to archive; skipping.[/yellow]") + return + + cons.print() + cons.print(f"[bold]Archiving[/bold] to [magenta]{dest}[/magenta] ({archive_format})") + + with tempfile.NamedTemporaryFile("w", suffix=".yaml", delete=False) as tmp: + manifest_path = tmp.name + + cons.indent() + try: + manifest = __build_manifest(case, targets, sources, dest, archive_format) + file_dump_yaml(manifest_path, manifest) + + if archive_format == "dir": + __copy_dir(sources, case, dest) + shutil.copy2(manifest_path, os.path.join(dest, "manifest.yaml")) + else: + __write_tar(sources, case, dest, compressed=(archive_format == "tar.zst"), manifest_file=manifest_path) + + cons.print(f"Wrote [magenta]{len(sources)}[/magenta] artifact(s) + manifest.yaml.") + finally: + cons.unindent() + if os.path.isfile(manifest_path): + os.unlink(manifest_path) diff --git a/toolchain/mfc/run/run.py b/toolchain/mfc/run/run.py index 8157f0090d..82e886c064 100644 --- a/toolchain/mfc/run/run.py +++ b/toolchain/mfc/run/run.py @@ -13,6 +13,7 @@ from ..common import MFC_ROOT_DIR, MFC_TEMPLATE_DIR, MFCException, does_command_exist, file_dump_yaml, file_read, file_write, format_list_to_string, isspace, system from ..printer import cons from ..state import ARG, ARGS, CFG, gpuConfigOptions +from . import archive as archive_mod from . import input, queues @@ -168,6 +169,19 @@ def run(targets=None, case=None): qsystem = queues.get_system() cons.print(f"Using queue system [magenta]{qsystem.name}[/magenta].") + # Pre-flight --archive: validate the path, format, and reserve a + # unique destination BEFORE pre_process runs. Failing here saves + # the user a full simulation if the archive settings are bad. + # Batch jobs skip archiving entirely (outputs land asynchronously). + archive_plan = None + if ARG("archive") is not None: + if isinstance(qsystem, queues.InteractiveSystem): + archive_plan = archive_mod.plan_archive(case) + if verbosity >= 1: + cons.print(f" [dim]Archive destination: {archive_plan.dest}[/dim]") + else: + cons.print("[yellow]--archive is ignored for batch submissions (outputs are produced asynchronously).[/yellow]") + # At verbosity >= 1, show more details about what's happening if verbosity >= 1: cons.print(f" [dim]Targets: {', '.join(t.name for t in targets)}[/dim]") @@ -191,3 +205,6 @@ def run(targets=None, case=None): cons.print("[bold]Executing simulation...[/bold]") __execute_job_script(qsystem) + + if archive_plan is not None: + archive_mod.archive(archive_plan, case, targets) diff --git a/toolchain/mfc/run/test_archive.py b/toolchain/mfc/run/test_archive.py new file mode 100644 index 0000000000..08de201d96 --- /dev/null +++ b/toolchain/mfc/run/test_archive.py @@ -0,0 +1,227 @@ +""" +Tests for the archive module. + +Covers source collection, archive format round-trips (dir, tar, tar.zst), +the no-op path when --archive is unset, destination-collision tally +fallback, and plan_archive() error cases. +""" + +import datetime +import os +import shutil +import subprocess +import tarfile +import tempfile +import types +import unittest +from contextlib import contextmanager +from unittest.mock import patch + +from .. import state + + +def _make_fake_case(dirpath: str): + from .input import MFCInputFile + + case_py = os.path.join(dirpath, "case.py") + with open(case_py, "w") as f: + f.write("# fake case\n") + with open(os.path.join(dirpath, "simulation.inp"), "w") as f: + f.write("&user_inputs /\n") + with open(os.path.join(dirpath, "equations.dat"), "w") as f: + f.write("eq\n") + with open(os.path.join(dirpath, "MFC.out"), "w") as f: + f.write("log\n") + os.makedirs(os.path.join(dirpath, "D")) + with open(os.path.join(dirpath, "D", "output.dat"), "w") as f: + f.write("data\n") + + return MFCInputFile(case_py, dirpath, {}) + + +def _fake_targets(): + return [types.SimpleNamespace(name="simulation")] + + +def _collect_sources_fn(): + # Module-level dunder names aren't mangled; attribute access inside a class body + # would be, so we reach through __dict__. + from . import archive + + return archive.__dict__["__collect_sources"] + + +class _StateSandbox(unittest.TestCase): + def setUp(self): + self._saved_gARG = dict(state.gARG) + state.gARG.update({"name": "MFC", "output_summary": None}) + + def tearDown(self): + state.gARG.clear() + state.gARG.update(self._saved_gARG) + + +class TestCollectSources(_StateSandbox): + def test_finds_case_namelist_and_artifacts(self): + collect = _collect_sources_fn() + + with tempfile.TemporaryDirectory() as tmp: + case = _make_fake_case(tmp) + sources = collect(case, _fake_targets()) + + names = {os.path.basename(s) for s in sources} + self.assertIn("case.py", names) + self.assertIn("simulation.inp", names) + self.assertIn("equations.dat", names) + self.assertIn("MFC.out", names) + self.assertIn("D", names) + + def test_skips_missing_artifacts(self): + collect = _collect_sources_fn() + + with tempfile.TemporaryDirectory() as tmp: + case = _make_fake_case(tmp) + sources = collect(case, _fake_targets()) + + names = {os.path.basename(s) for s in sources} + self.assertNotIn("time_data.dat", names) + self.assertNotIn("restart_data", names) + self.assertNotIn("p_all", names) + + +@contextmanager +def _run_archive(fmt: str): + from . import archive as archive_mod + + src = tempfile.mkdtemp() + dest_root = tempfile.mkdtemp() + try: + state.gARG["archive"] = dest_root + state.gARG["archive_format"] = fmt + case = _make_fake_case(src) + plan = archive_mod.plan_archive(case) + archive_mod.archive(plan, case, _fake_targets()) + entries = sorted(os.listdir(dest_root)) + assert len(entries) == 1, f"expected one archive entry, got {entries}" + yield os.path.join(dest_root, entries[0]) + finally: + shutil.rmtree(src, ignore_errors=True) + shutil.rmtree(dest_root, ignore_errors=True) + + +class TestArchiveFormats(_StateSandbox): + def test_format_dir(self): + with _run_archive("dir") as path: + self.assertTrue(os.path.isdir(path)) + self.assertTrue(os.path.isfile(os.path.join(path, "manifest.yaml"))) + self.assertTrue(os.path.isfile(os.path.join(path, "case.py"))) + self.assertTrue(os.path.isfile(os.path.join(path, "simulation.inp"))) + self.assertTrue(os.path.isdir(os.path.join(path, "D"))) + self.assertTrue(os.path.isfile(os.path.join(path, "D", "output.dat"))) + + def test_format_tar(self): + with _run_archive("tar") as path: + self.assertTrue(path.endswith(".tar")) + self.assertTrue(tarfile.is_tarfile(path)) + with tarfile.open(path) as tf: + names = tf.getnames() + base = os.path.basename(path)[: -len(".tar")] + self.assertIn(f"{base}/manifest.yaml", names) + self.assertIn(f"{base}/case.py", names) + self.assertIn(f"{base}/D/output.dat", names) + + def test_format_tar_zst(self): + try: + r = subprocess.run(["tar", "--zstd", "--version"], capture_output=True, check=False, timeout=5) + if r.returncode != 0: + self.skipTest("tar --zstd not available") + except (FileNotFoundError, subprocess.TimeoutExpired): + self.skipTest("tar --zstd not available") + + with _run_archive("tar.zst") as path: + self.assertTrue(path.endswith(".tar.zst")) + self.assertGreater(os.path.getsize(path), 0) + + listing = subprocess.run(["tar", "--zstd", "-tf", path], capture_output=True, text=True, check=True) + base = os.path.basename(path)[: -len(".tar.zst")] + self.assertIn(f"{base}/manifest.yaml", listing.stdout) + self.assertIn(f"{base}/case.py", listing.stdout) + + +class TestArchiveBehavior(_StateSandbox): + def test_plan_returns_none_when_archive_unset(self): + from . import archive as archive_mod + + state.gARG["archive"] = None + state.gARG["archive_format"] = "dir" + # No case needed since plan_archive returns before touching it. + self.assertIsNone(archive_mod.plan_archive(case=None)) + + def test_plan_bad_format_raises(self): + from ..common import MFCException + from . import archive as archive_mod + + with tempfile.TemporaryDirectory() as src, tempfile.TemporaryDirectory() as dest_root: + state.gARG["archive"] = dest_root + state.gARG["archive_format"] = "bogus" + case = _make_fake_case(src) + with self.assertRaises(MFCException): + archive_mod.plan_archive(case) + + def test_plan_uses_case_dir_name_as_stem(self): + from . import archive as archive_mod + + fixed = datetime.datetime(2026, 1, 1, 12, 0, 0) + + with tempfile.TemporaryDirectory() as parent, tempfile.TemporaryDirectory() as dest_root, patch("mfc.run.archive.datetime.datetime") as MockDT: + MockDT.now.return_value = fixed + case_dir = os.path.join(parent, "my_cool_case") + os.makedirs(case_dir) + state.gARG["archive"] = dest_root + state.gARG["archive_format"] = "dir" + case = _make_fake_case(case_dir) + + plan = archive_mod.plan_archive(case) + + self.assertEqual(plan.stem, "my_cool_case-20260101-120000") + self.assertTrue(plan.dest.endswith("my_cool_case-20260101-120000")) + + def test_plan_collision_gets_tally_suffix(self): + from . import archive as archive_mod + + fixed = datetime.datetime(2026, 1, 1, 12, 0, 0) + + with tempfile.TemporaryDirectory() as src, tempfile.TemporaryDirectory() as dest_root, patch("mfc.run.archive.datetime.datetime") as MockDT: + MockDT.now.return_value = fixed + state.gARG["archive"] = dest_root + state.gARG["archive_format"] = "dir" + case = _make_fake_case(src) + + plan1 = archive_mod.plan_archive(case) + os.makedirs(plan1.dest) # simulate an existing archive at that path + plan2 = archive_mod.plan_archive(case) + os.makedirs(plan2.dest) + plan3 = archive_mod.plan_archive(case) + + self.assertTrue(plan2.dest.endswith("-2")) + self.assertTrue(plan3.dest.endswith("-3")) + self.assertNotEqual(plan1.dest, plan2.dest) + self.assertNotEqual(plan2.dest, plan3.dest) + + def test_output_summary_outside_case_dir_is_skipped(self): + collect = _collect_sources_fn() + + with tempfile.TemporaryDirectory() as src, tempfile.TemporaryDirectory() as elsewhere: + outside = os.path.join(elsewhere, "summary.yaml") + with open(outside, "w") as f: + f.write("k: v\n") + + state.gARG["output_summary"] = outside + case = _make_fake_case(src) + sources = collect(case, _fake_targets()) + + self.assertNotIn(outside, sources) + + +if __name__ == "__main__": + unittest.main()