Add bandwidth_analysis/src/bandwidth_analysis/core.py

This commit is contained in:
Mika 2026-03-23 11:13:27 +00:00
parent b7f7c50e07
commit 6aa6e5e85d

View file

@ -0,0 +1,66 @@
from __future__ import annotations
import pandas as pd
from typing import Any, Dict, List, Union, NamedTuple
def _validate_input(data: Union[Dict[str, Any], pd.DataFrame], name: str) -> pd.DataFrame:
if data is None:
raise ValueError(f"{name} darf nicht None sein.")
if isinstance(data, dict):
df = pd.DataFrame(data)
elif isinstance(data, pd.DataFrame):
df = data.copy()
else:
raise TypeError(f"{name} muss ein dict oder pandas.DataFrame sein, nicht {type(data).__name__}.")
required_columns = {"segment", "bandwidth", "retry_tail"}
if not required_columns.issubset(df.columns):
raise ValueError(f"{name} fehlt eine oder mehrere der erforderlichen Spalten: {required_columns}.")
return df
class AnalysisResult(NamedTuple):
bandwidth_change: float
retry_tail_change: float
hotspot_segments: List[str]
def analyze_bandwidth(
baseline_data: Union[Dict[str, Any], pd.DataFrame],
test_data: Union[Dict[str, Any], pd.DataFrame]
) -> AnalysisResult:
"""Vergleicht Baseline- und Testdaten, um Bandbreitenänderung, Tail-Verhalten und Hotspots zu ermitteln."""
baseline_df = _validate_input(baseline_data, "baseline_data")
test_df = _validate_input(test_data, "test_data")
baseline_mean_bw = baseline_df["bandwidth"].mean()
test_mean_bw = test_df["bandwidth"].mean()
baseline_mean_rt = baseline_df["retry_tail"].mean()
test_mean_rt = test_df["retry_tail"].mean()
if baseline_mean_bw == 0:
raise ValueError("Baseline Bandwidth darf nicht 0 sein.")
if baseline_mean_rt == 0:
raise ValueError("Baseline Retry-Tail darf nicht 0 sein.")
bandwidth_change = (test_mean_bw - baseline_mean_bw) / baseline_mean_bw
retry_tail_change = (test_mean_rt - baseline_mean_rt) / baseline_mean_rt
merged = pd.merge(
baseline_df.set_index("segment"),
test_df.set_index("segment"),
on="segment",
how="inner",
suffixes=("_base", "_test")
)
merged["bw_rel_change"] = (merged["bandwidth_test"] - merged["bandwidth_base"]) / merged["bandwidth_base"].replace(0, pd.NA)
merged["rt_rel_change"] = (merged["retry_tail_test"] - merged["retry_tail_base"]) / merged["retry_tail_base"].replace(0, pd.NA)
hotspot_mask = (merged["bw_rel_change"].abs() > 0.1) | (merged["rt_rel_change"].abs() > 0.1)
hotspot_segments = merged.index[hotspot_mask].tolist()
return AnalysisResult(
bandwidth_change=float(bandwidth_change),
retry_tail_change=float(retry_tail_change),
hotspot_segments=hotspot_segments,
)