Add stability_analysis/src/stability_analysis/core.py

This commit is contained in:
Mika 2026-03-14 17:17:11 +00:00
parent f159fdfb02
commit 7435c2e3e3

View file

@ -0,0 +1,93 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import List, Dict, Any
import numpy as np
import pandas as pd
from statistics import mean, stdev
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
@dataclass
class AnalysisResults:
"""Strukturierte Ergebnisse der Stabilitätsanalyse."""
stable_cluster: List[str]
outlier_counts: Dict[str, int]
patterns: List[str]
def to_dict(self) -> Dict[str, Any]:
return {
'stable_cluster': self.stable_cluster,
'outlier_counts': self.outlier_counts,
'patterns': self.patterns,
}
class InputValidationError(Exception):
"""Wird ausgelöst, wenn Eingabedaten fehlerhaft oder unvollständig sind."""
pass
def _validate_data_list(data_list: List[Dict[str, Any]]) -> None:
if not isinstance(data_list, list):
raise InputValidationError("data_list muss eine Liste von Dicts sein.")
if not data_list:
raise InputValidationError("data_list darf nicht leer sein.")
for i, item in enumerate(data_list):
if not isinstance(item, dict):
raise InputValidationError(f"Eintrag {i} ist kein Dict.")
if len(item) == 0:
raise InputValidationError(f"Eintrag {i} ist leer.")
def analyze_data(data_list: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analysiert Laufzeitdaten, um stabile Cluster, Ausreißer und Muster zu erkennen."""
logger.info("Starte Datenanalyse für %d Einträge.", len(data_list))
_validate_data_list(data_list)
try:
df = pd.DataFrame(data_list)
except Exception as e:
logger.error("Fehler beim Erstellen des DataFrame: %s", e)
raise InputValidationError("Ungültige Datenstruktur.") from e
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
if not numeric_cols:
raise InputValidationError("Keine numerischen Spalten für Analyse gefunden.")
stable_clusters: List[str] = []
outlier_counts: Dict[str, int] = {}
patterns: List[str] = []
for col in numeric_cols:
values = df[col].dropna().to_numpy()
if len(values) < 2:
continue
col_mean = np.mean(values)
col_std = np.std(values)
lower, upper = col_mean - 2 * col_std, col_mean + 2 * col_std
outlier_mask = (values < lower) | (values > upper)
outlier_count = int(np.sum(outlier_mask))
outlier_counts[col] = outlier_count
rel_std = (col_std / col_mean) if col_mean != 0 else float('inf')
if rel_std < 0.05:
stable_clusters.append(col)
patterns.append(f"{col}: sehr stabil (rel_std={rel_std:.3f})")
elif rel_std < 0.15:
patterns.append(f"{col}: moderat stabil (rel_std={rel_std:.3f})")
else:
patterns.append(f"{col}: variabel (rel_std={rel_std:.3f})")
result = AnalysisResults(
stable_cluster=stable_clusters,
outlier_counts=outlier_counts,
patterns=patterns,
)
logger.info("Analyse abgeschlossen. %d stabile Cluster gefunden.", len(stable_clusters))
assert isinstance(result.to_dict(), dict), "Analyseergebnis muss dict sein"
return result.to_dict()