Add dry_run_mode/src/dry_run_mode/cli.py

This commit is contained in:
Mika 2026-02-06 15:31:51 +00:00
parent b1d3675653
commit 01a9ae059e

View file

@ -0,0 +1,67 @@
import argparse
import json
import hashlib
from pathlib import Path
from typing import Any
from dry_run_mode import core
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
raise FileNotFoundError(f"Eingabedatei {path} wurde nicht gefunden.")
with path.open('r', encoding='utf-8') as f:
try:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError(f"Datei {path} enthält kein JSON-Objekt.")
return data
except json.JSONDecodeError as e:
raise ValueError(f"Ungültiges JSON in {path}: {e}")
def _write_json(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _compute_policy_hash(constants: dict[str, Any]) -> str:
encoded = json.dumps(constants, sort_keys=True).encode('utf-8')
return hashlib.sha256(encoded).hexdigest()
def main() -> None:
parser = argparse.ArgumentParser(description="Dry-Run Evaluierung der Policy-Entscheidungen.")
parser.add_argument('--input', required=True, help='Pfad zur Input-Datei drift_report.json')
parser.add_argument('--constants', required=True, help='Pfad zur JSON-Datei mit Policy-Konstanten')
parser.add_argument('--output', required=False, help='Pfad zur Ausgabe-Datei (Default: output/dry_run_result.json)')
args = parser.parse_args()
input_path = Path(args.input)
constants_path = Path(args.constants)
output_path = Path(args.output) if args.output else Path('output/dry_run_result.json')
drift_report = _load_json(input_path)
policy_constants = _load_json(constants_path)
# Validate required keys
if not all(k in policy_constants for k in ("alpha", "offset")):
raise KeyError("policy_constants.json muss Felder 'alpha' und 'offset' enthalten.")
result = core.dry_run_evaluation(drift_report)
policy_hash = _compute_policy_hash(policy_constants)
dry_run_output = {
"policy_hash": policy_hash,
"results": result
}
_write_json(output_path, dry_run_output)
print(f"Dry-Run abgeschlossen. Ausgabe geschrieben nach: {output_path}")
if __name__ == '__main__':
main()