From dd1b9218d476b8a7d2d811497abc3f0f52279a7e Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 5 Apr 2026 13:56:52 +0000 Subject: [PATCH] Add analysis_tool/src/analysis_tool/core.py --- analysis_tool/src/analysis_tool/core.py | 127 ++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 analysis_tool/src/analysis_tool/core.py diff --git a/analysis_tool/src/analysis_tool/core.py b/analysis_tool/src/analysis_tool/core.py new file mode 100644 index 0000000..2e9d408 --- /dev/null +++ b/analysis_tool/src/analysis_tool/core.py @@ -0,0 +1,127 @@ +from __future__ import annotations +import argparse +import json +import logging +from pathlib import Path +from statistics import median +from typing import List, Dict +import pandas as pd + + +logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + + +class AnalysisSummary: + """Repräsentiert das Analyseergebnis inklusive Validitätsstatus und Kennzahlen.""" + + def __init__(self, valid: bool, median: float, IQR: float, band_width: float, delta_band_width: float) -> None: + self.valid = bool(valid) + self.median = float(median) + self.IQR = float(IQR) + self.band_width = float(band_width) + self.delta_band_width = float(delta_band_width) + + def to_dict(self) -> Dict[str, float | bool]: + return { + "valid": self.valid, + "median": self.median, + "IQR": self.IQR, + "band_width": self.band_width, + "delta_band_width": self.delta_band_width, + } + + def __repr__(self) -> str: + return f"AnalysisSummary(valid={self.valid}, median={self.median:.3f}, IQR={self.IQR:.3f}, band_width={self.band_width:.3f}, delta_band_width={self.delta_band_width:.3f})" + + +def analyze_results(log_entries: List[Dict]) -> AnalysisSummary: + """Analysiert eine Liste von Preflight-Logeinträgen und berechnet Kennzahlen wie Median, IQR, Bandbreite und Differenzen zwischen aux=2 und aux=3.""" + + if not isinstance(log_entries, list) or not all(isinstance(item, dict) for item in log_entries): + raise TypeError("log_entries muss eine Liste von Dictionaries sein.") + + df = pd.DataFrame(log_entries) + required_cols = {"timestamp", "measured_p", "freeze_ok", "setup_fingerprint", "policy_hash"} + if not required_cols.issubset(df.columns): + raise ValueError(f"Fehlende Spalten in log_entries: {required_cols - set(df.columns)}") + + df = df[df["freeze_ok"] == True] + if df.empty: + logging.warning("Keine gültigen Freeze-OK-Einträge gefunden.") + return AnalysisSummary(False, 0.0, 0.0, 0.0, 0.0) + + grouped = df.groupby("setup_fingerprint") + group_medians = {} + group_bandwidths = {} + + for name, group in grouped: + try: + values = group["measured_p"].astype(float).tolist() + except ValueError: + logging.error(f"Ungültige Werte in setup_fingerprint {name}.") + continue + if len(values) < 2: + continue + + q75, q25 = group["measured_p"].quantile(0.75), group["measured_p"].quantile(0.25) + iqr = q75 - q25 + bw = group["measured_p"].max() - group["measured_p"].min() + group_medians[name] = median(values) + group_bandwidths[name] = bw + + if len(group_medians) < 2: + logging.warning("Nicht genügend Gruppen für Vergleich.") + return AnalysisSummary(False, float(median(df["measured_p"])), float(df["measured_p"].quantile(0.75) - df["measured_p"].quantile(0.25)), float(df["measured_p"].max() - df["measured_p"].min()), 0.0) + + keys = list(group_bandwidths.keys()) + delta_bw = abs(group_bandwidths[keys[0]] - group_bandwidths[keys[1]]) + + overall_valid = len(set(df["policy_hash"])) == 1 + + overall_median = float(median(df["measured_p"])) + overall_iqr = float(df["measured_p"].quantile(0.75) - df["measured_p"].quantile(0.25)) + overall_bw = float(df["measured_p"].max() - df["measured_p"].min()) + + summary = AnalysisSummary(overall_valid, overall_median, overall_iqr, overall_bw, float(delta_bw)) + return summary + + +def _load_csv(path: Path) -> List[Dict]: + df = pd.read_csv(path) + expected_cols = ["timestamp", "measured_p", "freeze_ok", "setup_fingerprint", "policy_hash"] + missing = [c for c in expected_cols if c not in df.columns] + if missing: + raise ValueError(f"CSV fehlt Spalten: {missing}") + + df["freeze_ok"] = df["freeze_ok"].astype(bool) + entries = df.to_dict(orient="records") + return entries + + +def _save_json(path: Path, summary: AnalysisSummary) -> None: + with open(path, 'w', encoding='utf-8') as f: + json.dump(summary.to_dict(), f, indent=2, ensure_ascii=False) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Analyse von Preflight-Logdaten im Freeze-Band.") + parser.add_argument("--input", required=True, help="Pfad zur Eingabe-CSV-Datei mit Preflight-Daten.") + parser.add_argument("--output", required=True, help="Pfad zur Ausgabe-JSON-Datei für Analyseergebnisse.") + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + logging.info(f"Lade Daten aus {input_path} ...") + entries = _load_csv(input_path) + + logging.info("Analysiere Ergebnisse ...") + summary = analyze_results(entries) + + logging.info(f"Schreibe Analyseergebnis nach {output_path} ...") + _save_json(output_path, summary) + logging.info("Analyse abgeschlossen.") + + +if __name__ == "__main__": + main() \ No newline at end of file