diff --git a/retry_mechanism_analysis/src/retry_mechanism_analysis/core.py b/retry_mechanism_analysis/src/retry_mechanism_analysis/core.py new file mode 100644 index 0000000..5217a8e --- /dev/null +++ b/retry_mechanism_analysis/src/retry_mechanism_analysis/core.py @@ -0,0 +1,60 @@ +import pandas as pd +import logging +from typing import List, Dict, Any, TypedDict + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class RetryAnalysis(TypedDict): + parallelism_level: int + p50: float + p95: float + p99: float + + +def _validate_log_data(log_data: List[Dict[str, Any]]) -> None: + if not isinstance(log_data, list): + raise ValueError("log_data must be a list of dictionaries.") + for entry in log_data: + if not isinstance(entry, dict): + raise ValueError("Each log entry must be a dictionary.") + required_keys = {"parallelism_level", "retry_latency_ms"} + missing = required_keys - set(entry.keys()) + if missing: + raise ValueError(f"Missing keys in log entry: {missing}") + if not isinstance(entry["parallelism_level"], int): + raise ValueError("'parallelism_level' must be an integer.") + if not isinstance(entry["retry_latency_ms"], (int, float)): + raise ValueError("'retry_latency_ms' must be numeric.") + + +def compare_retry_overhead(log_data: List[Dict[str, Any]]) -> List[RetryAnalysis]: + """Vergleicht Retry-Overheads über verschiedene Parallelitätslevels und liefert aggregierte Metriken.""" + _validate_log_data(log_data) + + logger.info("Start comparing retry overheads across parallelism levels.") + + df = pd.DataFrame(log_data) + if df.empty: + raise ValueError("log_data is empty; no data to analyze.") + + grouped = df.groupby("parallelism_level")["retry_latency_ms"] + + results: List[RetryAnalysis] = [] + for level, latencies in grouped: + p50 = float(latencies.quantile(0.5)) + p95 = float(latencies.quantile(0.95)) + p99 = float(latencies.quantile(0.99)) + analysis: RetryAnalysis = { + "parallelism_level": int(level), + "p50": p50, + "p95": p95, + "p99": p99, + } + results.append(analysis) + logger.debug(f"Computed metrics for parallelism {level}: {analysis}") + + logger.info("Completed retry overhead comparison.") + return results \ No newline at end of file