Add event_data_capture/src/event_data_capture/cli.py

This commit is contained in:
Mika 2026-02-27 10:56:51 +00:00
parent 66e28218a5
commit eaf1de3ade

View file

@ -0,0 +1,61 @@
import argparse
import json
import os
from pathlib import Path
from typing import Any, List
from event_data_capture.core import capture_event_data, EventData
def _validate_file_path(path_str: str) -> Path:
path = Path(path_str)
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {path_str}")
return path
def _read_json_file(path: Path) -> Any:
with path.open('r', encoding='utf-8') as f:
return json.load(f)
def _write_json_file(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def main() -> None:
parser = argparse.ArgumentParser(description="Erfasst und aggregiert Eventdaten für Run #6.")
parser.add_argument('--input', required=True, help='Pfad zur JSON-Datei mit Eventdaten.')
parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei für die Event-Zusammenfassung.')
args = parser.parse_args()
input_path = _validate_file_path(args.input)
output_path = Path(args.output)
input_data = _read_json_file(input_path)
if not isinstance(input_data, dict):
raise ValueError('Eingabedatei muss ein JSON-Objekt mit Strata enthalten.')
summary: dict[str, List[dict[str, float]]] = {}
for stratum_name, events in input_data.items():
if not isinstance(stratum_name, str):
continue
try:
event_list = capture_event_data(stratum_name)
except Exception as e:
raise RuntimeError(f"Fehler bei Verarbeitung von {stratum_name}: {e}") from e
summary[stratum_name] = [
{'warn_rate': e.warn_rate, 'unknown_rate': e.unknown_rate} for e in event_list
]
_write_json_file(output_path, summary)
if __name__ == '__main__':
main()