Add unknowns_analysis/src/unknowns_analysis/core.py
This commit is contained in:
parent
2e6b5189d2
commit
d46ded9789
1 changed files with 79 additions and 0 deletions
79
unknowns_analysis/src/unknowns_analysis/core.py
Normal file
79
unknowns_analysis/src/unknowns_analysis/core.py
Normal file
|
|
@ -0,0 +1,79 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class UnknownAnalysisResult:
|
||||||
|
"""Datenmodell für das Ergebnis der Unknowns-Analyse."""
|
||||||
|
|
||||||
|
total_unknowns: int
|
||||||
|
warn_increases: int
|
||||||
|
warn_stable: int
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, int]:
|
||||||
|
return {
|
||||||
|
"total_unknowns": self.total_unknowns,
|
||||||
|
"warn_increases": self.warn_increases,
|
||||||
|
"warn_stable": self.warn_stable,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def analyze_unknowns(unknowns_data: List[Dict[str, Any]], warn_data: List[Dict[str, Any]]) -> Dict[str, int]:
|
||||||
|
"""Analysiert Unknown-Daten im Kontext von WARN-Metriken.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
unknowns_data: Liste von Dictionaries mit Unknown-Metriken pro Replay-Window.
|
||||||
|
warn_data: Liste von Dictionaries mit WARN-Entscheidungen oder Metriken.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mit aggregierten Analyseergebnissen:
|
||||||
|
total_unknowns, warn_increases, warn_stable.
|
||||||
|
"""
|
||||||
|
if not isinstance(unknowns_data, list) or not all(isinstance(e, dict) for e in unknowns_data):
|
||||||
|
raise TypeError("unknowns_data muss eine Liste von Dictionaries sein.")
|
||||||
|
if not isinstance(warn_data, list) or not all(isinstance(e, dict) for e in warn_data):
|
||||||
|
raise TypeError("warn_data muss eine Liste von Dictionaries sein.")
|
||||||
|
|
||||||
|
if len(unknowns_data) != len(warn_data):
|
||||||
|
logger.warning("Unterschiedliche Längen der Eingabedaten. Kürze auf gemeinsame Länge.")
|
||||||
|
min_len = min(len(unknowns_data), len(warn_data))
|
||||||
|
unknowns_data = unknowns_data[:min_len]
|
||||||
|
warn_data = warn_data[:min_len]
|
||||||
|
|
||||||
|
df_unk = pd.DataFrame(unknowns_data)
|
||||||
|
df_warn = pd.DataFrame(warn_data)
|
||||||
|
|
||||||
|
if df_unk.empty or df_warn.empty:
|
||||||
|
result = UnknownAnalysisResult(total_unknowns=0, warn_increases=0, warn_stable=0)
|
||||||
|
return result.to_dict()
|
||||||
|
|
||||||
|
# Erwartete Spalten validieren
|
||||||
|
if 'count_unknowns' not in df_unk.columns:
|
||||||
|
raise ValueError("Spalte 'count_unknowns' fehlt in unknowns_data.")
|
||||||
|
if 'warn_rate' not in df_warn.columns:
|
||||||
|
raise ValueError("Spalte 'warn_rate' fehlt in warn_data.")
|
||||||
|
|
||||||
|
df = pd.concat([df_unk['count_unknowns'], df_warn['warn_rate']], axis=1)
|
||||||
|
|
||||||
|
df['unknown_change'] = df['count_unknowns'].diff().fillna(0)
|
||||||
|
df['warn_change'] = df['warn_rate'].diff().fillna(0)
|
||||||
|
|
||||||
|
total_unknowns = int(df['count_unknowns'].sum())
|
||||||
|
warn_increases = int((df['unknown_change'] > 0) & (df['warn_change'] > 0)).sum()
|
||||||
|
warn_stable = int((df['unknown_change'] > 0) & (df['warn_change'] == 0)).sum()
|
||||||
|
|
||||||
|
result = UnknownAnalysisResult(
|
||||||
|
total_unknowns=total_unknowns,
|
||||||
|
warn_increases=warn_increases,
|
||||||
|
warn_stable=warn_stable,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("Analyse abgeschlossen: %s", result.to_dict())
|
||||||
|
|
||||||
|
return result.to_dict()
|
||||||
Loading…
Reference in a new issue