Add clocksource_switch_analysis/src/clocksource_switch_analysis/cli.py

This commit is contained in:
Mika 2026-01-22 11:58:37 +00:00
parent b678427be8
commit 420c09c6bb

View file

@ -0,0 +1,60 @@
import argparse
import json
import sys
from pathlib import Path
from typing import Any
from clocksource_switch_analysis import core
def _load_json(file_path: Path) -> Any:
if not file_path.exists():
raise FileNotFoundError(f"Input file not found: {file_path}")
with file_path.open('r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in input file {file_path}: {e}") from e
if not isinstance(data, list):
raise ValueError("Expected top-level JSON array of event objects.")
return data
def _save_json(file_path: Path, content: Any) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open('w', encoding='utf-8') as f:
json.dump(content, f, indent=2, sort_keys=True)
def main(argv: list[str] | None = None) -> None:
parser = argparse.ArgumentParser(
description="Aggregates clocksource_switch event data to summary metrics."
)
parser.add_argument(
"--input",
required=True,
help="Pfad zur JSON-Datei mit Rohdaten der clocksource_switch-Events."
)
parser.add_argument(
"--output",
required=True,
help="Pfad für die aggregierte JSON-Ausgabe."
)
args = parser.parse_args(argv)
input_path = Path(args.input)
output_path = Path(args.output)
raw_data = _load_json(input_path)
try:
summary = core.aggregate_clocksource_data(raw_data)
except Exception as e:
print(f"Fehler bei der Aggregation: {e}", file=sys.stderr)
sys.exit(1)
_save_json(output_path, summary)
if __name__ == "__main__":
main()