From 22fdb193fc330329ce49239083087f3d53479c12 Mon Sep 17 00:00:00 2001 From: Mika Date: Fri, 3 Apr 2026 10:57:04 +0000 Subject: [PATCH] Add artifact.preflight_checker/src/artifact_preflight_checker/cli.py --- .../src/artifact_preflight_checker/cli.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 artifact.preflight_checker/src/artifact_preflight_checker/cli.py diff --git a/artifact.preflight_checker/src/artifact_preflight_checker/cli.py b/artifact.preflight_checker/src/artifact_preflight_checker/cli.py new file mode 100644 index 0000000..9d75d03 --- /dev/null +++ b/artifact.preflight_checker/src/artifact_preflight_checker/cli.py @@ -0,0 +1,61 @@ +import argparse +import json +import logging +from pathlib import Path +from typing import Any, Dict + +from artifact_preflight_checker.core import run_preflight_check + + +logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s') +logger = logging.getLogger(__name__) + + +def _validate_path(path_str: str, must_exist: bool = False) -> Path: + path = Path(path_str) + if must_exist and not path.exists(): + raise FileNotFoundError(f"Pfad existiert nicht: {path}") + return path + + +def _load_json(path: Path) -> Any: + with path.open('r', encoding='utf-8') as f: + return json.load(f) + + +def _write_json(data: Dict[str, Any], path: Path) -> None: + with path.open('w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + +def main() -> None: + """CLI-Entry für den Preflight-Check.""" + parser = argparse.ArgumentParser(description='Mix Freeze Preflight Checker') + parser.add_argument('--input', required=True, help='Pfad zur JSON-Datei mit den valid_runs.') + parser.add_argument('--output', required=True, help='Pfad zur Schreibdatei für das Preflight-Ergebnis.') + + args = parser.parse_args() + + input_path = _validate_path(args.input, must_exist=True) + output_path = _validate_path(args.output, must_exist=False) + + logger.info("Lade Eingabedaten von %s", input_path) + valid_runs = _load_json(input_path) + + if not isinstance(valid_runs, list): + raise ValueError('Eingabedatei muss eine Liste von Run-Dictionaries enthalten.') + + logger.info("Starte Preflight-Check ...") + result = run_preflight_check(valid_runs) + + expected_keys = {'freeze_ok', 'freeze_target', 'freeze_tol'} + if not expected_keys.issubset(result.keys()): + raise ValueError(f'Preflight-Resultat hat unerwartete Struktur: {result}') + + logger.info("Schreibe Ergebnis nach %s", output_path) + _write_json(result, output_path) + logger.info("Preflight abgeschlossen: freeze_ok=%s", result.get('freeze_ok')) + + +if __name__ == '__main__': + main()