From 08b7308e1fd62ebe3dac8500c6f76da8c3a6ab21 Mon Sep 17 00:00:00 2001 From: Mika Date: Thu, 5 Feb 2026 13:42:03 +0000 Subject: [PATCH] Add decision_engine/src/decision_engine/cli.py --- decision_engine/src/decision_engine/cli.py | 112 +++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 decision_engine/src/decision_engine/cli.py diff --git a/decision_engine/src/decision_engine/cli.py b/decision_engine/src/decision_engine/cli.py new file mode 100644 index 0000000..e65913e --- /dev/null +++ b/decision_engine/src/decision_engine/cli.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import argparse +import json +import logging +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd + +from decision_engine.core import evaluate_run + + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(name)s - %(message)s' +) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Policy v1.1 Decision Engine Evaluierung' + ) + parser.add_argument('--input', required=True, help='Pfad zur audit.csv mit historischen Runs.') + parser.add_argument('--constants', required=True, help='Pfad zu policy_constants.json mit Schwellenwertdefinitionen.') + parser.add_argument('--output', required=True, help='Ausgabepfad für die Evaluierungsergebnisse im JSON-Format.') + return parser.parse_args() + + +def _validate_input_paths(input_path: Path, constants_path: Path) -> None: + if not input_path.exists() or not input_path.is_file(): + raise FileNotFoundError(f"Input CSV nicht gefunden: {input_path}") + if not constants_path.exists() or not constants_path.is_file(): + raise FileNotFoundError(f"Constants JSON nicht gefunden: {constants_path}") + + +def _load_constants(constants_path: Path) -> Dict[str, Any]: + with constants_path.open('r', encoding='utf-8') as f: + try: + constants = json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Ungültiges JSON in {constants_path}: {e}") + if not isinstance(constants, dict): + raise ValueError("Policy-Constants müssen ein JSON-Objekt sein.") + return constants + + +def _load_runs(input_path: Path) -> pd.DataFrame: + try: + df = pd.read_csv(input_path) + except Exception as e: + raise ValueError(f"Fehler beim Einlesen der CSV-Datei: {e}") + required_cols = {"run_id", "pinned", "warn_rate", "unknown_rate", "unknown_class", "prev_label"} + missing = required_cols - set(df.columns) + if missing: + raise ValueError(f"Fehlende Spalten in CSV: {missing}") + return df + + +def _evaluate_all(df: pd.DataFrame, constants: Dict[str, Any]) -> List[Dict[str, Any]]: + results: List[Dict[str, Any]] = [] + for _, row in df.iterrows(): + run_data = { + 'run_id': str(row['run_id']), + 'pinned': bool(row['pinned']), + 'warn_rate': float(row['warn_rate']), + 'unknown_rate': float(row['unknown_rate']), + 'unknown_class': str(row['unknown_class']), + 'prev_label': str(row['prev_label']) if not pd.isna(row['prev_label']) else '' + } + logger.debug(f"Evaluating run_id={run_data['run_id']}") + result = evaluate_run(run_data) + assert isinstance(result, dict), "evaluate_run muss ein dict zurückgeben" + assert 'final_decision' in result and 'reason' in result, "evaluate_run Ergebnis unvollständig" + results.append({ + 'run_id': run_data['run_id'], + 'final_decision': result['final_decision'], + 'reason': result['reason'] + }) + return results + + +def _save_results(output_path: Path, results: List[Dict[str, Any]]) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open('w', encoding='utf-8') as f: + json.dump(results, f, indent=2, ensure_ascii=False) + logger.info(f"Ergebnisse erfolgreich nach {output_path} geschrieben ({len(results)} Runs).") + + +def main() -> None: + args = _parse_args() + + input_path = Path(args.input).resolve() + constants_path = Path(args.constants).resolve() + output_path = Path(args.output).resolve() + + logger.info("Starte Policy v1.1 Evaluierung...") + + _validate_input_paths(input_path, constants_path) + + constants = _load_constants(constants_path) + runs_df = _load_runs(input_path) + + results = _evaluate_all(runs_df, constants) + _save_results(output_path, results) + + logger.info("Evaluierung abgeschlossen.") + + +if __name__ == '__main__': + main()