Add latency_calculator/src/latency_calculator/cli.py
This commit is contained in:
parent
98cc6399b4
commit
ed492a5e27
1 changed files with 95 additions and 0 deletions
95
latency_calculator/src/latency_calculator/cli.py
Normal file
95
latency_calculator/src/latency_calculator/cli.py
Normal file
|
|
@ -0,0 +1,95 @@
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List
|
||||||
|
from datetime import datetime
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from latency_calculator import core
|
||||||
|
|
||||||
|
|
||||||
|
class CLIError(Exception):
|
||||||
|
"""Custom exception for CLI-related errors."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_args() -> argparse.Namespace:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Berechnet Latenzstatistiken aus JSONL-Dateien mit Zeitstempeln."
|
||||||
|
)
|
||||||
|
parser.add_argument("--input", required=True, help="Pfad zur JSONL-Datei mit Zeitstempeln")
|
||||||
|
parser.add_argument("--output", required=True, help="Pfad zur Ausgabedatei für Latenzstatistiken")
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def _load_jsonl(file_path: Path) -> List[Dict[str, Any]]:
|
||||||
|
if not file_path.exists():
|
||||||
|
raise CLIError(f"Input file not found: {file_path}")
|
||||||
|
entries = []
|
||||||
|
with file_path.open("r", encoding="utf-8") as f:
|
||||||
|
for line in f:
|
||||||
|
if line.strip():
|
||||||
|
try:
|
||||||
|
entries.append(json.loads(line))
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise CLIError(f"Invalid JSON line: {e}") from e
|
||||||
|
if not entries:
|
||||||
|
raise CLIError("Input file is empty or contains no valid entries.")
|
||||||
|
return entries
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_entry(entry: Dict[str, Any]) -> bool:
|
||||||
|
required_fields = {"t_publish", "t_gate_read", "t_index_visible"}
|
||||||
|
if not required_fields.issubset(entry):
|
||||||
|
missing = required_fields - set(entry)
|
||||||
|
raise CLIError(f"Missing required fields: {', '.join(missing)}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
args = _parse_args()
|
||||||
|
input_path = Path(args.input)
|
||||||
|
output_path = Path(args.output)
|
||||||
|
|
||||||
|
# Load and validate input
|
||||||
|
entries = _load_jsonl(input_path)
|
||||||
|
valid_entries = []
|
||||||
|
|
||||||
|
for e in entries:
|
||||||
|
_validate_entry(e)
|
||||||
|
valid_entries.append(e)
|
||||||
|
|
||||||
|
# Calculate latency statistics using core function
|
||||||
|
results = []
|
||||||
|
for entry in valid_entries:
|
||||||
|
stats = core.calculate_latency(entry)
|
||||||
|
results.append(stats)
|
||||||
|
|
||||||
|
# Aggregate results if multiple entries
|
||||||
|
df = pd.DataFrame(results)
|
||||||
|
aggregated = {
|
||||||
|
"p50": float(df["p50"].mean()),
|
||||||
|
"p95": float(df["p95"].mean()),
|
||||||
|
"max": float(df["max"].max())
|
||||||
|
}
|
||||||
|
|
||||||
|
# Ensure output directory exists
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Save to output file
|
||||||
|
with output_path.open("w", encoding="utf-8") as f:
|
||||||
|
json.dump(aggregated, f, indent=2)
|
||||||
|
|
||||||
|
# Basic CI-ready validation
|
||||||
|
assert set(aggregated.keys()) == {"p50", "p95", "max"}, "Output fields mismatch"
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except CLIError as err:
|
||||||
|
print(f"Fehler: {err}")
|
||||||
|
exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Unerwarteter Fehler: {e}")
|
||||||
|
exit(2)
|
||||||
Loading…
Reference in a new issue