Add trace_agg.py/src/trace_agg/core.py

This commit is contained in:
Mika 2026-01-18 17:11:09 +00:00
parent 8b69c4f49a
commit e114968ed0

View file

@ -0,0 +1,85 @@
from __future__ import annotations
import pandas as pd
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@dataclass
class TraceData:
"""Repräsentiert ein einzelnes eBPF-Trace-Ereignis."""
corr_id: int
event_type: str
timestamp: float
def __post_init__(self) -> None:
if not isinstance(self.corr_id, int):
raise TypeError("corr_id muss ein int sein")
if not isinstance(self.event_type, str):
raise TypeError("event_type muss ein str sein")
if not isinstance(self.timestamp, (int, float)):
raise TypeError("timestamp muss eine Zahl sein")
def analyze_trace(trace_data: List[TraceData]) -> Dict[str, Any]:
"""Analysiert eBPF-Trace-Ereignisse und extrahiert Metriken über write_pre/write_post Hooks."""
assert isinstance(trace_data, list), "trace_data muss eine Liste sein"
if not trace_data:
logger.warning("Leere Trace-Daten übergeben.")
return {"summary": {}, "metrics": {}}
# Validierung und Konvertierung in DataFrame
validated = [entry for entry in trace_data if isinstance(entry, TraceData)]
df = pd.DataFrame([e.__dict__ for e in validated])
required_cols = {"corr_id", "event_type", "timestamp"}
if not required_cols.issubset(df.columns):
raise ValueError(f"Fehlende erwartete Spalten: {required_cols - set(df.columns)}")
result: Dict[str, Any] = {"by_corr_id": {}, "global_stats": {}}
for corr_id, group in df.groupby("corr_id"):
group_sorted = group.sort_values("timestamp")
write_pre = group_sorted[group_sorted["event_type"] == "write_pre"]
write_post = group_sorted[group_sorted["event_type"] == "write_post"]
reads = group_sorted[group_sorted["event_type"].str.startswith("read")]
if not write_pre.empty and not write_post.empty:
pre_time = write_pre["timestamp"].min()
post_time = write_post["timestamp"].max()
duration = post_time - pre_time
reads_between = reads[(reads["timestamp"] >= pre_time) & (reads["timestamp"] <= post_time)]
retry_free_reads = len(reads_between)
else:
duration = None
retry_free_reads = 0
result["by_corr_id"][corr_id] = {
"duration_window": duration,
"retry_free_reads": retry_free_reads,
"events_total": len(group_sorted),
}
# Globale Auswertung
durations = [v["duration_window"] for v in result["by_corr_id"].values() if v["duration_window"] is not None]
if durations:
result["global_stats"]["avg_duration"] = float(pd.Series(durations).mean())
result["global_stats"]["max_duration"] = float(pd.Series(durations).max())
else:
result["global_stats"]["avg_duration"] = 0.0
result["global_stats"]["max_duration"] = 0.0
result["global_stats"]["total_corr_ids"] = len(result["by_corr_id"])
logger.info(
"Analyse abgeschlossen: %d corr_ids, durchschnittliche Dauer %.6f",
result["global_stats"]["total_corr_ids"],
result["global_stats"]["avg_duration"],
)
return result