diff --git a/data_analysis/src/data_analysis/core.py b/data_analysis/src/data_analysis/core.py new file mode 100644 index 0000000..01db35d --- /dev/null +++ b/data_analysis/src/data_analysis/core.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import List, Dict, Any +import pandas as pd + + +# Configure basic logging for CI readiness +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@dataclass +class RunData: + """Repräsentiert einen einzelnen Run-Datensatz.""" + run_id: str + timestamp: datetime + delta_t: float + expiring_at: datetime + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> RunData: + """Validiert und erstellt ein RunData-Objekt aus einem Dictionary.""" + required_fields = {"run_id", "timestamp", "delta_t", "expiring_at"} + missing = required_fields - data.keys() + if missing: + raise ValueError(f"Fehlende Felder in RunData: {missing}") + + try: + return cls( + run_id=str(data["run_id"]), + timestamp=pd.to_datetime(data["timestamp"]).to_pydatetime(), + delta_t=float(data["delta_t"]), + expiring_at=pd.to_datetime(data["expiring_at"]).to_pydatetime(), + ) + except Exception as e: + raise ValueError(f"Ungültige Feldwerte in RunData: {e}") from e + + +def analyze_runs(run_data: List[RunData]) -> Dict[str, Any]: + """Analysiert Run-Daten und identifiziert Δt<0-Fälle sowie mögliche Muster. + + Args: + run_data: Liste von RunData-Objekten. + + Returns: + dict: Aggregierte Analyseergebnisse. + """ + assert isinstance(run_data, list), "run_data muss eine Liste sein." + + if not run_data: + logger.warning("Leere RunData-Liste übergeben.") + return {"total_runs": 0, "negative_dt_count": 0, "negative_dt_ratio": 0.0} + + # Validierung der Elemente + for rd in run_data: + if not isinstance(rd, RunData): + raise TypeError(f"Ungültiger Typ in run_data: {type(rd)}") + + # DataFrame erstellen + df = pd.DataFrame([{ + "run_id": r.run_id, + "timestamp": r.timestamp, + "delta_t": r.delta_t, + "expiring_at": r.expiring_at, + } for r in run_data]) + + if df.empty: + return {"total_runs": 0, "negative_dt_count": 0, "negative_dt_ratio": 0.0} + + total = len(df) + negative_mask = df["delta_t"] < 0 + neg_count = negative_mask.sum() + + result = { + "total_runs": int(total), + "negative_dt_count": int(neg_count), + "negative_dt_ratio": float(neg_count / total) if total > 0 else 0.0, + } + + # Gruppierung nach run_id für tiefergehende Analyse + if neg_count > 0: + neg_df = df[negative_mask] + by_run = ( + neg_df.groupby("run_id") + .agg(count=("delta_t", "size"), mean_delta_t=("delta_t", "mean")) + .reset_index() + ) + result["negative_runs"] = by_run.to_dict(orient="records") + + logger.info( + "Analyse abgeschlossen: total=%d, negative=%d (%.2f%%)", + result["total_runs"], + result["negative_dt_count"], + result["negative_dt_ratio"] * 100, + ) + + return result