diff --git a/latency_calculator/src/latency_calculator/core.py b/latency_calculator/src/latency_calculator/core.py new file mode 100644 index 0000000..e57284f --- /dev/null +++ b/latency_calculator/src/latency_calculator/core.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime +from typing import Dict, Any, Union +import pandas as pd +import logging + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + + +@dataclass +class TimestampEntry: + t_publish: datetime + t_gate_read: datetime + t_index_visible: datetime + + +@dataclass +class LatencyStatistics: + p50: float + p95: float + max: float + + +def _parse_timestamp(value: Union[str, int, float, datetime]) -> datetime: + if isinstance(value, datetime): + return value + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value) + if isinstance(value, str): + try: + return datetime.fromisoformat(value) + except ValueError: + # support timestamps with Z + if value.endswith('Z'): + try: + return datetime.fromisoformat(value.replace('Z', '+00:00')) + except ValueError as e: + raise ValueError(f"Ungültiges Zeitformat: {value}") from e + raise ValueError(f"Ungültiges Zeitformat: {value}") + raise TypeError(f"Ungültiger Zeittyp: {type(value)}") + + +def calculate_latency(timestamp_entry: Dict[str, Any]) -> Dict[str, float]: + """Berechnet p50, p95 und maximale Latenzwerte aus einer Menge von Zeitstempeln.""" + assert isinstance(timestamp_entry, dict), "timestamp_entry muss ein Dictionary sein" + + required_keys = {"t_publish", "t_gate_read", "t_index_visible"} + if not required_keys.issubset(timestamp_entry.keys()): + missing = required_keys - set(timestamp_entry.keys()) + raise KeyError(f"Fehlende Keys in timestamp_entry: {missing}") + + # Parse nested timestamp lists or single dicts + entries_raw = timestamp_entry.get("entries") if "entries" in timestamp_entry else [timestamp_entry] + if not isinstance(entries_raw, list): + raise ValueError("timestamp_entry['entries'] muss eine Liste sein") + + logger.info(f"Verarbeite {len(entries_raw)} Zeitstempel-Einträge für Latenzberechnung") + + records = [] + for entry in entries_raw: + try: + t_publish = _parse_timestamp(entry["t_publish"]) + t_gate_read = _parse_timestamp(entry["t_gate_read"]) + t_index_visible = _parse_timestamp(entry["t_index_visible"]) + + gate_latency = (t_gate_read - t_publish).total_seconds() + visible_latency = (t_index_visible - t_publish).total_seconds() + + if gate_latency < 0 or visible_latency < 0: + logger.warning(f"Negative Latenz erkannt in {entry}") + + records.append({ + "latency_gate": gate_latency, + "latency_visible": visible_latency + }) + except Exception as exc: + logger.error(f"Fehler beim Verarbeiten des Eintrags {entry}: {exc}") + + if not records: + raise ValueError("Keine gültigen Datensätze zur Latenzberechnung") + + df = pd.DataFrame(records) + + # Combine both latency types as one vector for statistics + combined_latencies = pd.concat([df["latency_gate"], df["latency_visible"]], ignore_index=True) + + p50 = float(combined_latencies.quantile(0.50)) + p95 = float(combined_latencies.quantile(0.95)) + max_latency = float(combined_latencies.max()) + + stats = LatencyStatistics(p50=p50, p95=p95, max=max_latency) + + # Validations for CI readiness + assert stats.p50 >= 0, "p50 darf nicht negativ sein" + assert stats.max >= stats.p95 >= stats.p50, "Werte müssen monoton sein" + + result = {"p50": stats.p50, "p95": stats.p95, "max": stats.max} + + logger.info(f"Berechnete Latenzstatistik: {json.dumps(result, indent=2)}") + + return result