Add drift_report_parser/src/drift_report_parser/cli.py
This commit is contained in:
parent
bcb578b3e2
commit
fc5b91f2a7
1 changed files with 63 additions and 0 deletions
63
drift_report_parser/src/drift_report_parser/cli.py
Normal file
63
drift_report_parser/src/drift_report_parser/cli.py
Normal file
|
|
@ -0,0 +1,63 @@
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
# Lokaler Import aus dem Core-Modul
|
||||||
|
from drift_report_parser.core import parse_drift_report
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> None:
|
||||||
|
"""Einstiegspunkt für die CLI zur Analyse von Drift-Reports.
|
||||||
|
|
||||||
|
Liest den angegebenen Drift-Report und schreibt die extrahierten Felder
|
||||||
|
als JSON-Datei zur weiteren Verarbeitung in CI/CD-Pipelines.
|
||||||
|
"""
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Parse a drift_report.json file and extract key drift metrics."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--input",
|
||||||
|
required=True,
|
||||||
|
help="Pfad zur JSON-Datei mit Drift-Reports."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--output",
|
||||||
|
required=True,
|
||||||
|
help="Pfad zur Ausgabedatei, in die die extrahierten Drift-Daten geschrieben werden."
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
|
input_path = Path(args.input)
|
||||||
|
output_path = Path(args.output)
|
||||||
|
|
||||||
|
# Input-Validierung
|
||||||
|
if not input_path.exists():
|
||||||
|
sys.stderr.write(f"ERROR: Eingabedatei '{input_path}' wurde nicht gefunden.\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not input_path.is_file():
|
||||||
|
sys.stderr.write(f"ERROR: '{input_path}' ist keine Datei.\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
parsed_data: list[dict[str, Any]] = parse_drift_report(str(input_path))
|
||||||
|
except (json.JSONDecodeError, OSError) as err:
|
||||||
|
sys.stderr.write(f"Fehler beim Lesen/Parsen von '{input_path}': {err}\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Ausgabeverzeichnis ggf. anlegen
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with output_path.open("w", encoding="utf-8") as f_out:
|
||||||
|
json.dump(parsed_data, f_out, ensure_ascii=False, indent=2)
|
||||||
|
except OSError as err:
|
||||||
|
sys.stderr.write(f"Fehler beim Schreiben in '{output_path}': {err}\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print(f"Erfolgreich {len(parsed_data)} Einträge nach '{output_path}' exportiert.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in a new issue