From fc5b91f2a7970b4219ba2ebcea68c350e0fdedbe Mon Sep 17 00:00:00 2001 From: Mika Date: Sat, 31 Jan 2026 13:07:38 +0000 Subject: [PATCH] Add drift_report_parser/src/drift_report_parser/cli.py --- .../src/drift_report_parser/cli.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 drift_report_parser/src/drift_report_parser/cli.py diff --git a/drift_report_parser/src/drift_report_parser/cli.py b/drift_report_parser/src/drift_report_parser/cli.py new file mode 100644 index 0000000..0a6fb14 --- /dev/null +++ b/drift_report_parser/src/drift_report_parser/cli.py @@ -0,0 +1,63 @@ +import argparse +import json +import sys +from pathlib import Path +from typing import Any + +# Lokaler Import aus dem Core-Modul +from drift_report_parser.core import parse_drift_report + +def main(argv: list[str] | None = None) -> None: + """Einstiegspunkt für die CLI zur Analyse von Drift-Reports. + + Liest den angegebenen Drift-Report und schreibt die extrahierten Felder + als JSON-Datei zur weiteren Verarbeitung in CI/CD-Pipelines. + """ + parser = argparse.ArgumentParser( + description="Parse a drift_report.json file and extract key drift metrics." + ) + parser.add_argument( + "--input", + required=True, + help="Pfad zur JSON-Datei mit Drift-Reports." + ) + parser.add_argument( + "--output", + required=True, + help="Pfad zur Ausgabedatei, in die die extrahierten Drift-Daten geschrieben werden." + ) + + args = parser.parse_args(argv) + + input_path = Path(args.input) + output_path = Path(args.output) + + # Input-Validierung + if not input_path.exists(): + sys.stderr.write(f"ERROR: Eingabedatei '{input_path}' wurde nicht gefunden.\n") + sys.exit(1) + + if not input_path.is_file(): + sys.stderr.write(f"ERROR: '{input_path}' ist keine Datei.\n") + sys.exit(1) + + try: + parsed_data: list[dict[str, Any]] = parse_drift_report(str(input_path)) + except (json.JSONDecodeError, OSError) as err: + sys.stderr.write(f"Fehler beim Lesen/Parsen von '{input_path}': {err}\n") + sys.exit(1) + + try: + # Ausgabeverzeichnis ggf. anlegen + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as f_out: + json.dump(parsed_data, f_out, ensure_ascii=False, indent=2) + except OSError as err: + sys.stderr.write(f"Fehler beim Schreiben in '{output_path}': {err}\n") + sys.exit(1) + + print(f"Erfolgreich {len(parsed_data)} Einträge nach '{output_path}' exportiert.") + + +if __name__ == "__main__": + main() \ No newline at end of file