Add retry_mechanism_analysis/src/retry_mechanism_analysis/core.py

This commit is contained in:
Mika 2026-03-12 11:51:46 +00:00
parent 9602867ad1
commit 6408ac9b7f

View file

@ -0,0 +1,60 @@
import pandas as pd
import logging
from typing import List, Dict, Any, TypedDict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RetryAnalysis(TypedDict):
parallelism_level: int
p50: float
p95: float
p99: float
def _validate_log_data(log_data: List[Dict[str, Any]]) -> None:
if not isinstance(log_data, list):
raise ValueError("log_data must be a list of dictionaries.")
for entry in log_data:
if not isinstance(entry, dict):
raise ValueError("Each log entry must be a dictionary.")
required_keys = {"parallelism_level", "retry_latency_ms"}
missing = required_keys - set(entry.keys())
if missing:
raise ValueError(f"Missing keys in log entry: {missing}")
if not isinstance(entry["parallelism_level"], int):
raise ValueError("'parallelism_level' must be an integer.")
if not isinstance(entry["retry_latency_ms"], (int, float)):
raise ValueError("'retry_latency_ms' must be numeric.")
def compare_retry_overhead(log_data: List[Dict[str, Any]]) -> List[RetryAnalysis]:
"""Vergleicht Retry-Overheads über verschiedene Parallelitätslevels und liefert aggregierte Metriken."""
_validate_log_data(log_data)
logger.info("Start comparing retry overheads across parallelism levels.")
df = pd.DataFrame(log_data)
if df.empty:
raise ValueError("log_data is empty; no data to analyze.")
grouped = df.groupby("parallelism_level")["retry_latency_ms"]
results: List[RetryAnalysis] = []
for level, latencies in grouped:
p50 = float(latencies.quantile(0.5))
p95 = float(latencies.quantile(0.95))
p99 = float(latencies.quantile(0.99))
analysis: RetryAnalysis = {
"parallelism_level": int(level),
"p50": p50,
"p95": p95,
"p99": p99,
}
results.append(analysis)
logger.debug(f"Computed metrics for parallelism {level}: {analysis}")
logger.info("Completed retry overhead comparison.")
return results