Add measure_latency/src/measure_latency/core.py

This commit is contained in:
Mika 2026-02-15 11:41:27 +00:00
commit a1c7919db3

View file

@ -0,0 +1,109 @@
from __future__ import annotations
import time
import json
import os
from pathlib import Path
from datetime import datetime
from statistics import median, quantiles, variance
from typing import List, Dict, Any
class LatencyResult:
"""Repräsentiert eine einzelne Messung der Latenzzeit-Komponenten."""
def __init__(
self,
upload_end_time: datetime,
api_response_time: datetime,
fs_mtime: datetime,
) -> None:
assert isinstance(upload_end_time, datetime), "upload_end_time muss datetime sein"
assert isinstance(api_response_time, datetime), "api_response_time muss datetime sein"
assert isinstance(fs_mtime, datetime), "fs_mtime muss datetime sein"
self.upload_end_time = upload_end_time
self.api_response_time = api_response_time
self.fs_mtime = fs_mtime
self.offsets: Dict[str, float] = self.compute_offsets()
def compute_offsets(self) -> Dict[str, float]:
"""Berechnet Zeitdifferenzen zwischen den Messpunkten in Sekunden."""
upload_to_api = (self.api_response_time - self.upload_end_time).total_seconds()
api_to_fs = (self.fs_mtime - self.api_response_time).total_seconds()
upload_to_fs = (self.fs_mtime - self.upload_end_time).total_seconds()
return {
"upload_to_api": upload_to_api,
"api_to_fs": api_to_fs,
"upload_to_fs": upload_to_fs,
}
def to_dict(self) -> Dict[str, Any]:
return {
"upload_end_time": self.upload_end_time.isoformat(),
"api_response_time": self.api_response_time.isoformat(),
"fs_mtime": self.fs_mtime.isoformat(),
"offsets": self.offsets,
}
def measure_latencies(n_runs: int) -> List[LatencyResult]:
"""Führt n wiederholte Messungen aus und protokolliert Zeitpunkte pro Messung."""
assert isinstance(n_runs, int) and n_runs > 0, "n_runs muss eine positive ganze Zahl sein"
results: List[LatencyResult] = []
tmp_file = Path("/tmp/latency_probe.txt")
for _ in range(n_runs):
upload_end = datetime.now()
# simulierte API-Response nach kurzer Pause
time.sleep(0.01)
api_response = datetime.now()
# simuliertes Datei-Schreiben
tmp_file.write_text(str(upload_end.timestamp()))
fs_stat = os.stat(tmp_file)
fs_mtime = datetime.fromtimestamp(fs_stat.st_mtime)
result = LatencyResult(upload_end, api_response, fs_mtime)
results.append(result)
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)
json_path = output_dir / "latency_results.json"
with json_path.open("w", encoding="utf-8") as f:
json.dump([r.to_dict() for r in results], f, indent=2)
return results
def analyze_results(latency_results: List[LatencyResult]) -> Dict[str, Any]:
"""Analysiert Latenzergebnisse und berechnet Statistiken."""
assert isinstance(latency_results, list) and all(isinstance(r, LatencyResult) for r in latency_results), (
"latency_results muss eine Liste von LatencyResult sein"
)
all_upload_to_fs = [r.offsets["upload_to_fs"] for r in latency_results]
if not all_upload_to_fs:
return {}
median_val = median(all_upload_to_fs)
try:
q50 = quantiles(all_upload_to_fs, n=100)[49]
q95 = quantiles(all_upload_to_fs, n=100)[94]
except Exception:
q50 = median_val
q95 = max(all_upload_to_fs)
summary: Dict[str, Any] = {
"count": len(all_upload_to_fs),
"p50": q50,
"p95": q95,
"max": max(all_upload_to_fs),
"variance": variance(all_upload_to_fs) if len(all_upload_to_fs) > 1 else 0.0,
}
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)
summary_path = output_dir / "analysis_summary.json"
with summary_path.open("w", encoding="utf-8") as f:
json.dump(summary, f, indent=2)
return summary