Add decision_engine/src/decision_engine/cli.py

This commit is contained in:
Mika 2026-02-05 13:42:03 +00:00
parent e2ce445ffa
commit 08b7308e1f

View file

@ -0,0 +1,112 @@
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
from decision_engine.core import evaluate_run
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s - %(message)s'
)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='Policy v1.1 Decision Engine Evaluierung'
)
parser.add_argument('--input', required=True, help='Pfad zur audit.csv mit historischen Runs.')
parser.add_argument('--constants', required=True, help='Pfad zu policy_constants.json mit Schwellenwertdefinitionen.')
parser.add_argument('--output', required=True, help='Ausgabepfad für die Evaluierungsergebnisse im JSON-Format.')
return parser.parse_args()
def _validate_input_paths(input_path: Path, constants_path: Path) -> None:
if not input_path.exists() or not input_path.is_file():
raise FileNotFoundError(f"Input CSV nicht gefunden: {input_path}")
if not constants_path.exists() or not constants_path.is_file():
raise FileNotFoundError(f"Constants JSON nicht gefunden: {constants_path}")
def _load_constants(constants_path: Path) -> Dict[str, Any]:
with constants_path.open('r', encoding='utf-8') as f:
try:
constants = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Ungültiges JSON in {constants_path}: {e}")
if not isinstance(constants, dict):
raise ValueError("Policy-Constants müssen ein JSON-Objekt sein.")
return constants
def _load_runs(input_path: Path) -> pd.DataFrame:
try:
df = pd.read_csv(input_path)
except Exception as e:
raise ValueError(f"Fehler beim Einlesen der CSV-Datei: {e}")
required_cols = {"run_id", "pinned", "warn_rate", "unknown_rate", "unknown_class", "prev_label"}
missing = required_cols - set(df.columns)
if missing:
raise ValueError(f"Fehlende Spalten in CSV: {missing}")
return df
def _evaluate_all(df: pd.DataFrame, constants: Dict[str, Any]) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for _, row in df.iterrows():
run_data = {
'run_id': str(row['run_id']),
'pinned': bool(row['pinned']),
'warn_rate': float(row['warn_rate']),
'unknown_rate': float(row['unknown_rate']),
'unknown_class': str(row['unknown_class']),
'prev_label': str(row['prev_label']) if not pd.isna(row['prev_label']) else ''
}
logger.debug(f"Evaluating run_id={run_data['run_id']}")
result = evaluate_run(run_data)
assert isinstance(result, dict), "evaluate_run muss ein dict zurückgeben"
assert 'final_decision' in result and 'reason' in result, "evaluate_run Ergebnis unvollständig"
results.append({
'run_id': run_data['run_id'],
'final_decision': result['final_decision'],
'reason': result['reason']
})
return results
def _save_results(output_path: Path, results: List[Dict[str, Any]]) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
logger.info(f"Ergebnisse erfolgreich nach {output_path} geschrieben ({len(results)} Runs).")
def main() -> None:
args = _parse_args()
input_path = Path(args.input).resolve()
constants_path = Path(args.constants).resolve()
output_path = Path(args.output).resolve()
logger.info("Starte Policy v1.1 Evaluierung...")
_validate_input_paths(input_path, constants_path)
constants = _load_constants(constants_path)
runs_df = _load_runs(input_path)
results = _evaluate_all(runs_df, constants)
_save_results(output_path, results)
logger.info("Evaluierung abgeschlossen.")
if __name__ == '__main__':
main()