From e114968ed0fc5b41e14edb0307dec9f2e5099ea8 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 18 Jan 2026 17:11:09 +0000 Subject: [PATCH] Add trace_agg.py/src/trace_agg/core.py --- trace_agg.py/src/trace_agg/core.py | 85 ++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 trace_agg.py/src/trace_agg/core.py diff --git a/trace_agg.py/src/trace_agg/core.py b/trace_agg.py/src/trace_agg/core.py new file mode 100644 index 0000000..bc76f7f --- /dev/null +++ b/trace_agg.py/src/trace_agg/core.py @@ -0,0 +1,85 @@ +from __future__ import annotations +import pandas as pd +import logging +from dataclasses import dataclass +from typing import List, Dict, Any + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +@dataclass +class TraceData: + """Repräsentiert ein einzelnes eBPF-Trace-Ereignis.""" + corr_id: int + event_type: str + timestamp: float + + def __post_init__(self) -> None: + if not isinstance(self.corr_id, int): + raise TypeError("corr_id muss ein int sein") + if not isinstance(self.event_type, str): + raise TypeError("event_type muss ein str sein") + if not isinstance(self.timestamp, (int, float)): + raise TypeError("timestamp muss eine Zahl sein") + + +def analyze_trace(trace_data: List[TraceData]) -> Dict[str, Any]: + """Analysiert eBPF-Trace-Ereignisse und extrahiert Metriken über write_pre/write_post Hooks.""" + + assert isinstance(trace_data, list), "trace_data muss eine Liste sein" + if not trace_data: + logger.warning("Leere Trace-Daten übergeben.") + return {"summary": {}, "metrics": {}} + + # Validierung und Konvertierung in DataFrame + validated = [entry for entry in trace_data if isinstance(entry, TraceData)] + df = pd.DataFrame([e.__dict__ for e in validated]) + + required_cols = {"corr_id", "event_type", "timestamp"} + if not required_cols.issubset(df.columns): + raise ValueError(f"Fehlende erwartete Spalten: {required_cols - set(df.columns)}") + + result: Dict[str, Any] = {"by_corr_id": {}, "global_stats": {}} + + for corr_id, group in df.groupby("corr_id"): + group_sorted = group.sort_values("timestamp") + + write_pre = group_sorted[group_sorted["event_type"] == "write_pre"] + write_post = group_sorted[group_sorted["event_type"] == "write_post"] + reads = group_sorted[group_sorted["event_type"].str.startswith("read")] + + if not write_pre.empty and not write_post.empty: + pre_time = write_pre["timestamp"].min() + post_time = write_post["timestamp"].max() + duration = post_time - pre_time + reads_between = reads[(reads["timestamp"] >= pre_time) & (reads["timestamp"] <= post_time)] + retry_free_reads = len(reads_between) + else: + duration = None + retry_free_reads = 0 + + result["by_corr_id"][corr_id] = { + "duration_window": duration, + "retry_free_reads": retry_free_reads, + "events_total": len(group_sorted), + } + + # Globale Auswertung + durations = [v["duration_window"] for v in result["by_corr_id"].values() if v["duration_window"] is not None] + if durations: + result["global_stats"]["avg_duration"] = float(pd.Series(durations).mean()) + result["global_stats"]["max_duration"] = float(pd.Series(durations).max()) + else: + result["global_stats"]["avg_duration"] = 0.0 + result["global_stats"]["max_duration"] = 0.0 + + result["global_stats"]["total_corr_ids"] = len(result["by_corr_id"]) + + logger.info( + "Analyse abgeschlossen: %d corr_ids, durchschnittliche Dauer %.6f", + result["global_stats"]["total_corr_ids"], + result["global_stats"]["avg_duration"], + ) + + return result