diff --git a/stability_analysis/src/stability_analysis/core.py b/stability_analysis/src/stability_analysis/core.py new file mode 100644 index 0000000..da02f17 --- /dev/null +++ b/stability_analysis/src/stability_analysis/core.py @@ -0,0 +1,93 @@ +from __future__ import annotations +import json +import logging +from dataclasses import dataclass +from typing import List, Dict, Any +import numpy as np +import pandas as pd +from statistics import mean, stdev + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + + +@dataclass +class AnalysisResults: + """Strukturierte Ergebnisse der Stabilitätsanalyse.""" + stable_cluster: List[str] + outlier_counts: Dict[str, int] + patterns: List[str] + + def to_dict(self) -> Dict[str, Any]: + return { + 'stable_cluster': self.stable_cluster, + 'outlier_counts': self.outlier_counts, + 'patterns': self.patterns, + } + + +class InputValidationError(Exception): + """Wird ausgelöst, wenn Eingabedaten fehlerhaft oder unvollständig sind.""" + pass + + +def _validate_data_list(data_list: List[Dict[str, Any]]) -> None: + if not isinstance(data_list, list): + raise InputValidationError("data_list muss eine Liste von Dicts sein.") + if not data_list: + raise InputValidationError("data_list darf nicht leer sein.") + for i, item in enumerate(data_list): + if not isinstance(item, dict): + raise InputValidationError(f"Eintrag {i} ist kein Dict.") + if len(item) == 0: + raise InputValidationError(f"Eintrag {i} ist leer.") + + +def analyze_data(data_list: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analysiert Laufzeitdaten, um stabile Cluster, Ausreißer und Muster zu erkennen.""" + logger.info("Starte Datenanalyse für %d Einträge.", len(data_list)) + _validate_data_list(data_list) + + try: + df = pd.DataFrame(data_list) + except Exception as e: + logger.error("Fehler beim Erstellen des DataFrame: %s", e) + raise InputValidationError("Ungültige Datenstruktur.") from e + + numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() + if not numeric_cols: + raise InputValidationError("Keine numerischen Spalten für Analyse gefunden.") + + stable_clusters: List[str] = [] + outlier_counts: Dict[str, int] = {} + patterns: List[str] = [] + + for col in numeric_cols: + values = df[col].dropna().to_numpy() + if len(values) < 2: + continue + col_mean = np.mean(values) + col_std = np.std(values) + lower, upper = col_mean - 2 * col_std, col_mean + 2 * col_std + outlier_mask = (values < lower) | (values > upper) + outlier_count = int(np.sum(outlier_mask)) + outlier_counts[col] = outlier_count + + rel_std = (col_std / col_mean) if col_mean != 0 else float('inf') + if rel_std < 0.05: + stable_clusters.append(col) + patterns.append(f"{col}: sehr stabil (rel_std={rel_std:.3f})") + elif rel_std < 0.15: + patterns.append(f"{col}: moderat stabil (rel_std={rel_std:.3f})") + else: + patterns.append(f"{col}: variabel (rel_std={rel_std:.3f})") + + result = AnalysisResults( + stable_cluster=stable_clusters, + outlier_counts=outlier_counts, + patterns=patterns, + ) + + logger.info("Analyse abgeschlossen. %d stabile Cluster gefunden.", len(stable_clusters)) + assert isinstance(result.to_dict(), dict), "Analyseergebnis muss dict sein" + return result.to_dict() \ No newline at end of file