Add analysis_tool/src/analysis_tool/core.py

This commit is contained in:
Mika 2026-04-05 13:56:52 +00:00
parent 99f46deadb
commit dd1b9218d4

View file

@ -0,0 +1,127 @@
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
from statistics import median
from typing import List, Dict
import pandas as pd
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
class AnalysisSummary:
"""Repräsentiert das Analyseergebnis inklusive Validitätsstatus und Kennzahlen."""
def __init__(self, valid: bool, median: float, IQR: float, band_width: float, delta_band_width: float) -> None:
self.valid = bool(valid)
self.median = float(median)
self.IQR = float(IQR)
self.band_width = float(band_width)
self.delta_band_width = float(delta_band_width)
def to_dict(self) -> Dict[str, float | bool]:
return {
"valid": self.valid,
"median": self.median,
"IQR": self.IQR,
"band_width": self.band_width,
"delta_band_width": self.delta_band_width,
}
def __repr__(self) -> str:
return f"AnalysisSummary(valid={self.valid}, median={self.median:.3f}, IQR={self.IQR:.3f}, band_width={self.band_width:.3f}, delta_band_width={self.delta_band_width:.3f})"
def analyze_results(log_entries: List[Dict]) -> AnalysisSummary:
"""Analysiert eine Liste von Preflight-Logeinträgen und berechnet Kennzahlen wie Median, IQR, Bandbreite und Differenzen zwischen aux=2 und aux=3."""
if not isinstance(log_entries, list) or not all(isinstance(item, dict) for item in log_entries):
raise TypeError("log_entries muss eine Liste von Dictionaries sein.")
df = pd.DataFrame(log_entries)
required_cols = {"timestamp", "measured_p", "freeze_ok", "setup_fingerprint", "policy_hash"}
if not required_cols.issubset(df.columns):
raise ValueError(f"Fehlende Spalten in log_entries: {required_cols - set(df.columns)}")
df = df[df["freeze_ok"] == True]
if df.empty:
logging.warning("Keine gültigen Freeze-OK-Einträge gefunden.")
return AnalysisSummary(False, 0.0, 0.0, 0.0, 0.0)
grouped = df.groupby("setup_fingerprint")
group_medians = {}
group_bandwidths = {}
for name, group in grouped:
try:
values = group["measured_p"].astype(float).tolist()
except ValueError:
logging.error(f"Ungültige Werte in setup_fingerprint {name}.")
continue
if len(values) < 2:
continue
q75, q25 = group["measured_p"].quantile(0.75), group["measured_p"].quantile(0.25)
iqr = q75 - q25
bw = group["measured_p"].max() - group["measured_p"].min()
group_medians[name] = median(values)
group_bandwidths[name] = bw
if len(group_medians) < 2:
logging.warning("Nicht genügend Gruppen für Vergleich.")
return AnalysisSummary(False, float(median(df["measured_p"])), float(df["measured_p"].quantile(0.75) - df["measured_p"].quantile(0.25)), float(df["measured_p"].max() - df["measured_p"].min()), 0.0)
keys = list(group_bandwidths.keys())
delta_bw = abs(group_bandwidths[keys[0]] - group_bandwidths[keys[1]])
overall_valid = len(set(df["policy_hash"])) == 1
overall_median = float(median(df["measured_p"]))
overall_iqr = float(df["measured_p"].quantile(0.75) - df["measured_p"].quantile(0.25))
overall_bw = float(df["measured_p"].max() - df["measured_p"].min())
summary = AnalysisSummary(overall_valid, overall_median, overall_iqr, overall_bw, float(delta_bw))
return summary
def _load_csv(path: Path) -> List[Dict]:
df = pd.read_csv(path)
expected_cols = ["timestamp", "measured_p", "freeze_ok", "setup_fingerprint", "policy_hash"]
missing = [c for c in expected_cols if c not in df.columns]
if missing:
raise ValueError(f"CSV fehlt Spalten: {missing}")
df["freeze_ok"] = df["freeze_ok"].astype(bool)
entries = df.to_dict(orient="records")
return entries
def _save_json(path: Path, summary: AnalysisSummary) -> None:
with open(path, 'w', encoding='utf-8') as f:
json.dump(summary.to_dict(), f, indent=2, ensure_ascii=False)
def main() -> None:
parser = argparse.ArgumentParser(description="Analyse von Preflight-Logdaten im Freeze-Band.")
parser.add_argument("--input", required=True, help="Pfad zur Eingabe-CSV-Datei mit Preflight-Daten.")
parser.add_argument("--output", required=True, help="Pfad zur Ausgabe-JSON-Datei für Analyseergebnisse.")
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
logging.info(f"Lade Daten aus {input_path} ...")
entries = _load_csv(input_path)
logging.info("Analysiere Ergebnisse ...")
summary = analyze_results(entries)
logging.info(f"Schreibe Analyseergebnis nach {output_path} ...")
_save_json(output_path, summary)
logging.info("Analyse abgeschlossen.")
if __name__ == "__main__":
main()