Add latency_calculator/src/latency_calculator/core.py

This commit is contained in:
Mika 2026-02-14 15:32:00 +00:00
parent 39f7ad883a
commit 98cc6399b4

View file

@ -0,0 +1,105 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any, Union
import pandas as pd
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
@dataclass
class TimestampEntry:
t_publish: datetime
t_gate_read: datetime
t_index_visible: datetime
@dataclass
class LatencyStatistics:
p50: float
p95: float
max: float
def _parse_timestamp(value: Union[str, int, float, datetime]) -> datetime:
if isinstance(value, datetime):
return value
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value)
if isinstance(value, str):
try:
return datetime.fromisoformat(value)
except ValueError:
# support timestamps with Z
if value.endswith('Z'):
try:
return datetime.fromisoformat(value.replace('Z', '+00:00'))
except ValueError as e:
raise ValueError(f"Ungültiges Zeitformat: {value}") from e
raise ValueError(f"Ungültiges Zeitformat: {value}")
raise TypeError(f"Ungültiger Zeittyp: {type(value)}")
def calculate_latency(timestamp_entry: Dict[str, Any]) -> Dict[str, float]:
"""Berechnet p50, p95 und maximale Latenzwerte aus einer Menge von Zeitstempeln."""
assert isinstance(timestamp_entry, dict), "timestamp_entry muss ein Dictionary sein"
required_keys = {"t_publish", "t_gate_read", "t_index_visible"}
if not required_keys.issubset(timestamp_entry.keys()):
missing = required_keys - set(timestamp_entry.keys())
raise KeyError(f"Fehlende Keys in timestamp_entry: {missing}")
# Parse nested timestamp lists or single dicts
entries_raw = timestamp_entry.get("entries") if "entries" in timestamp_entry else [timestamp_entry]
if not isinstance(entries_raw, list):
raise ValueError("timestamp_entry['entries'] muss eine Liste sein")
logger.info(f"Verarbeite {len(entries_raw)} Zeitstempel-Einträge für Latenzberechnung")
records = []
for entry in entries_raw:
try:
t_publish = _parse_timestamp(entry["t_publish"])
t_gate_read = _parse_timestamp(entry["t_gate_read"])
t_index_visible = _parse_timestamp(entry["t_index_visible"])
gate_latency = (t_gate_read - t_publish).total_seconds()
visible_latency = (t_index_visible - t_publish).total_seconds()
if gate_latency < 0 or visible_latency < 0:
logger.warning(f"Negative Latenz erkannt in {entry}")
records.append({
"latency_gate": gate_latency,
"latency_visible": visible_latency
})
except Exception as exc:
logger.error(f"Fehler beim Verarbeiten des Eintrags {entry}: {exc}")
if not records:
raise ValueError("Keine gültigen Datensätze zur Latenzberechnung")
df = pd.DataFrame(records)
# Combine both latency types as one vector for statistics
combined_latencies = pd.concat([df["latency_gate"], df["latency_visible"]], ignore_index=True)
p50 = float(combined_latencies.quantile(0.50))
p95 = float(combined_latencies.quantile(0.95))
max_latency = float(combined_latencies.max())
stats = LatencyStatistics(p50=p50, p95=p95, max=max_latency)
# Validations for CI readiness
assert stats.p50 >= 0, "p50 darf nicht negativ sein"
assert stats.max >= stats.p95 >= stats.p50, "Werte müssen monoton sein"
result = {"p50": stats.p50, "p95": stats.p95, "max": stats.max}
logger.info(f"Berechnete Latenzstatistik: {json.dumps(result, indent=2)}")
return result