From ed492a5e27c5397645d8d34988cdfde5f28170cd Mon Sep 17 00:00:00 2001 From: Mika Date: Sat, 14 Feb 2026 15:32:00 +0000 Subject: [PATCH] Add latency_calculator/src/latency_calculator/cli.py --- .../src/latency_calculator/cli.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 latency_calculator/src/latency_calculator/cli.py diff --git a/latency_calculator/src/latency_calculator/cli.py b/latency_calculator/src/latency_calculator/cli.py new file mode 100644 index 0000000..018feed --- /dev/null +++ b/latency_calculator/src/latency_calculator/cli.py @@ -0,0 +1,95 @@ +import argparse +import json +from pathlib import Path +from typing import Any, Dict, List +from datetime import datetime +import pandas as pd + +from latency_calculator import core + + +class CLIError(Exception): + """Custom exception for CLI-related errors.""" + pass + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Berechnet Latenzstatistiken aus JSONL-Dateien mit Zeitstempeln." + ) + parser.add_argument("--input", required=True, help="Pfad zur JSONL-Datei mit Zeitstempeln") + parser.add_argument("--output", required=True, help="Pfad zur Ausgabedatei für Latenzstatistiken") + return parser.parse_args() + + +def _load_jsonl(file_path: Path) -> List[Dict[str, Any]]: + if not file_path.exists(): + raise CLIError(f"Input file not found: {file_path}") + entries = [] + with file_path.open("r", encoding="utf-8") as f: + for line in f: + if line.strip(): + try: + entries.append(json.loads(line)) + except json.JSONDecodeError as e: + raise CLIError(f"Invalid JSON line: {e}") from e + if not entries: + raise CLIError("Input file is empty or contains no valid entries.") + return entries + + +def _validate_entry(entry: Dict[str, Any]) -> bool: + required_fields = {"t_publish", "t_gate_read", "t_index_visible"} + if not required_fields.issubset(entry): + missing = required_fields - set(entry) + raise CLIError(f"Missing required fields: {', '.join(missing)}") + return True + + +def main() -> None: + args = _parse_args() + input_path = Path(args.input) + output_path = Path(args.output) + + # Load and validate input + entries = _load_jsonl(input_path) + valid_entries = [] + + for e in entries: + _validate_entry(e) + valid_entries.append(e) + + # Calculate latency statistics using core function + results = [] + for entry in valid_entries: + stats = core.calculate_latency(entry) + results.append(stats) + + # Aggregate results if multiple entries + df = pd.DataFrame(results) + aggregated = { + "p50": float(df["p50"].mean()), + "p95": float(df["p95"].mean()), + "max": float(df["max"].max()) + } + + # Ensure output directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Save to output file + with output_path.open("w", encoding="utf-8") as f: + json.dump(aggregated, f, indent=2) + + # Basic CI-ready validation + assert set(aggregated.keys()) == {"p50", "p95", "max"}, "Output fields mismatch" + + +if __name__ == "__main__": + try: + main() + except CLIError as err: + print(f"Fehler: {err}") + exit(1) + except Exception as e: + print(f"Unerwarteter Fehler: {e}") + exit(2)