diff --git a/unknowns_analysis/src/unknowns_analysis/core.py b/unknowns_analysis/src/unknowns_analysis/core.py new file mode 100644 index 0000000..ddeb6b4 --- /dev/null +++ b/unknowns_analysis/src/unknowns_analysis/core.py @@ -0,0 +1,79 @@ +from __future__ import annotations +import logging +from dataclasses import dataclass +from typing import List, Dict, Any +import pandas as pd + + +logger = logging.getLogger(__name__) + + +@dataclass +class UnknownAnalysisResult: + """Datenmodell für das Ergebnis der Unknowns-Analyse.""" + + total_unknowns: int + warn_increases: int + warn_stable: int + + def to_dict(self) -> Dict[str, int]: + return { + "total_unknowns": self.total_unknowns, + "warn_increases": self.warn_increases, + "warn_stable": self.warn_stable, + } + + +def analyze_unknowns(unknowns_data: List[Dict[str, Any]], warn_data: List[Dict[str, Any]]) -> Dict[str, int]: + """Analysiert Unknown-Daten im Kontext von WARN-Metriken. + + Args: + unknowns_data: Liste von Dictionaries mit Unknown-Metriken pro Replay-Window. + warn_data: Liste von Dictionaries mit WARN-Entscheidungen oder Metriken. + + Returns: + Dictionary mit aggregierten Analyseergebnissen: + total_unknowns, warn_increases, warn_stable. + """ + if not isinstance(unknowns_data, list) or not all(isinstance(e, dict) for e in unknowns_data): + raise TypeError("unknowns_data muss eine Liste von Dictionaries sein.") + if not isinstance(warn_data, list) or not all(isinstance(e, dict) for e in warn_data): + raise TypeError("warn_data muss eine Liste von Dictionaries sein.") + + if len(unknowns_data) != len(warn_data): + logger.warning("Unterschiedliche Längen der Eingabedaten. Kürze auf gemeinsame Länge.") + min_len = min(len(unknowns_data), len(warn_data)) + unknowns_data = unknowns_data[:min_len] + warn_data = warn_data[:min_len] + + df_unk = pd.DataFrame(unknowns_data) + df_warn = pd.DataFrame(warn_data) + + if df_unk.empty or df_warn.empty: + result = UnknownAnalysisResult(total_unknowns=0, warn_increases=0, warn_stable=0) + return result.to_dict() + + # Erwartete Spalten validieren + if 'count_unknowns' not in df_unk.columns: + raise ValueError("Spalte 'count_unknowns' fehlt in unknowns_data.") + if 'warn_rate' not in df_warn.columns: + raise ValueError("Spalte 'warn_rate' fehlt in warn_data.") + + df = pd.concat([df_unk['count_unknowns'], df_warn['warn_rate']], axis=1) + + df['unknown_change'] = df['count_unknowns'].diff().fillna(0) + df['warn_change'] = df['warn_rate'].diff().fillna(0) + + total_unknowns = int(df['count_unknowns'].sum()) + warn_increases = int((df['unknown_change'] > 0) & (df['warn_change'] > 0)).sum() + warn_stable = int((df['unknown_change'] > 0) & (df['warn_change'] == 0)).sum() + + result = UnknownAnalysisResult( + total_unknowns=total_unknowns, + warn_increases=warn_increases, + warn_stable=warn_stable, + ) + + logger.info("Analyse abgeschlossen: %s", result.to_dict()) + + return result.to_dict()