From a1c7919db3622658109d34cffd036d9f27ab170a Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 15 Feb 2026 11:41:27 +0000 Subject: [PATCH] Add measure_latency/src/measure_latency/core.py --- measure_latency/src/measure_latency/core.py | 109 ++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 measure_latency/src/measure_latency/core.py diff --git a/measure_latency/src/measure_latency/core.py b/measure_latency/src/measure_latency/core.py new file mode 100644 index 0000000..19bb546 --- /dev/null +++ b/measure_latency/src/measure_latency/core.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import time +import json +import os +from pathlib import Path +from datetime import datetime +from statistics import median, quantiles, variance +from typing import List, Dict, Any + + +class LatencyResult: + """Repräsentiert eine einzelne Messung der Latenzzeit-Komponenten.""" + + def __init__( + self, + upload_end_time: datetime, + api_response_time: datetime, + fs_mtime: datetime, + ) -> None: + assert isinstance(upload_end_time, datetime), "upload_end_time muss datetime sein" + assert isinstance(api_response_time, datetime), "api_response_time muss datetime sein" + assert isinstance(fs_mtime, datetime), "fs_mtime muss datetime sein" + + self.upload_end_time = upload_end_time + self.api_response_time = api_response_time + self.fs_mtime = fs_mtime + self.offsets: Dict[str, float] = self.compute_offsets() + + def compute_offsets(self) -> Dict[str, float]: + """Berechnet Zeitdifferenzen zwischen den Messpunkten in Sekunden.""" + upload_to_api = (self.api_response_time - self.upload_end_time).total_seconds() + api_to_fs = (self.fs_mtime - self.api_response_time).total_seconds() + upload_to_fs = (self.fs_mtime - self.upload_end_time).total_seconds() + return { + "upload_to_api": upload_to_api, + "api_to_fs": api_to_fs, + "upload_to_fs": upload_to_fs, + } + + def to_dict(self) -> Dict[str, Any]: + return { + "upload_end_time": self.upload_end_time.isoformat(), + "api_response_time": self.api_response_time.isoformat(), + "fs_mtime": self.fs_mtime.isoformat(), + "offsets": self.offsets, + } + + +def measure_latencies(n_runs: int) -> List[LatencyResult]: + """Führt n wiederholte Messungen aus und protokolliert Zeitpunkte pro Messung.""" + assert isinstance(n_runs, int) and n_runs > 0, "n_runs muss eine positive ganze Zahl sein" + results: List[LatencyResult] = [] + + tmp_file = Path("/tmp/latency_probe.txt") + for _ in range(n_runs): + upload_end = datetime.now() + # simulierte API-Response nach kurzer Pause + time.sleep(0.01) + api_response = datetime.now() + # simuliertes Datei-Schreiben + tmp_file.write_text(str(upload_end.timestamp())) + fs_stat = os.stat(tmp_file) + fs_mtime = datetime.fromtimestamp(fs_stat.st_mtime) + result = LatencyResult(upload_end, api_response, fs_mtime) + results.append(result) + + output_dir = Path("output") + output_dir.mkdir(exist_ok=True) + json_path = output_dir / "latency_results.json" + with json_path.open("w", encoding="utf-8") as f: + json.dump([r.to_dict() for r in results], f, indent=2) + + return results + + +def analyze_results(latency_results: List[LatencyResult]) -> Dict[str, Any]: + """Analysiert Latenzergebnisse und berechnet Statistiken.""" + assert isinstance(latency_results, list) and all(isinstance(r, LatencyResult) for r in latency_results), ( + "latency_results muss eine Liste von LatencyResult sein" + ) + + all_upload_to_fs = [r.offsets["upload_to_fs"] for r in latency_results] + if not all_upload_to_fs: + return {} + + median_val = median(all_upload_to_fs) + try: + q50 = quantiles(all_upload_to_fs, n=100)[49] + q95 = quantiles(all_upload_to_fs, n=100)[94] + except Exception: + q50 = median_val + q95 = max(all_upload_to_fs) + + summary: Dict[str, Any] = { + "count": len(all_upload_to_fs), + "p50": q50, + "p95": q95, + "max": max(all_upload_to_fs), + "variance": variance(all_upload_to_fs) if len(all_upload_to_fs) > 1 else 0.0, + } + + output_dir = Path("output") + output_dir.mkdir(exist_ok=True) + summary_path = output_dir / "analysis_summary.json" + with summary_path.open("w", encoding="utf-8") as f: + json.dump(summary, f, indent=2) + + return summary