diff --git a/data_analysis/src/data_analysis/core.py b/data_analysis/src/data_analysis/core.py new file mode 100644 index 0000000..164e212 --- /dev/null +++ b/data_analysis/src/data_analysis/core.py @@ -0,0 +1,98 @@ +from __future__ import annotations +import logging +from typing import List, Any +from dataclasses import dataclass, field +import pandas as pd +import numpy as np +from statistics import mean, stdev + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +@dataclass +class AnalysisResult: + """Struktur zur Speicherung der Ergebnisse der Sensordatenanalyse.""" + + significant_patterns: list[Any] = field(default_factory=list) + anomaly_events: list[Any] = field(default_factory=list) + + def __init__(self, significant_patterns: list[Any], anomaly_events: list[Any]) -> None: + self.significant_patterns = significant_patterns + self.anomaly_events = anomaly_events + + +class DataValidationError(Exception): + """Wird ausgelöst, wenn Eingabedaten ungültig sind.""" + + +@dataclass +class LogEntry: + timestamp: str + luminosity: int + sound_level: float + temperature: float + inference: float + + +def _validate_log_entries(log_entries: List[LogEntry]) -> None: + if not isinstance(log_entries, list): + raise DataValidationError("log_entries muss eine Liste sein.") + for entry in log_entries: + if not isinstance(entry, LogEntry): + raise DataValidationError("Eintrag ist kein LogEntry-Objekt.") + if not isinstance(entry.luminosity, int): + raise DataValidationError("Luminosity muss int sein.") + if not isinstance(entry.sound_level, (int, float)): + raise DataValidationError("Sound-Level muss numerisch sein.") + if not isinstance(entry.temperature, (int, float)): + raise DataValidationError("Temperature muss numerisch sein.") + if not isinstance(entry.inference, (int, float)): + raise DataValidationError("Inference muss numerisch sein.") + + +def analyze_data(log_entries: List[LogEntry]) -> AnalysisResult: + """Analysiert eine Liste von Rover-LogEinträgen und erkennt Muster sowie Anomalien.""" + _validate_log_entries(log_entries) + if not log_entries: + logger.warning("Leere Eingabeliste übergeben.") + return AnalysisResult([], []) + + data = { + "luminosity": [entry.luminosity for entry in log_entries], + "sound": [entry.sound_level for entry in log_entries], + "temperature": [entry.temperature for entry in log_entries], + "inference": [entry.inference for entry in log_entries], + } + df = pd.DataFrame(data) + + significant_patterns = [] + anomaly_events = [] + + # Erkennung signifikanter Korrelationen / Muster + corr = df.corr(numeric_only=True) + for col1 in corr.columns: + for col2 in corr.columns: + if col1 != col2 and abs(corr.loc[col1, col2]) > 0.8: + pattern = {"relationship": f"High correlation between {col1} and {col2}", "correlation": corr.loc[col1, col2]} + significant_patterns.append(pattern) + + # Erkennung von Anomalien basierend auf 3*Standardabweichung + for column in ["luminosity", "sound", "temperature", "inference"]: + series = df[column] + if len(series) < 2: + continue + mean_value = mean(series) + std_value = stdev(series) + lower_bound = mean_value - 3 * std_value + upper_bound = mean_value + 3 * std_value + anomalies = df[(series < lower_bound) | (series > upper_bound)] + for idx, row in anomalies.iterrows(): + event = {"index": int(idx), "parameter": column, "value": row[column]} + anomaly_events.append(event) + + logger.info("Analyse abgeschlossen: %d Muster, %d Anomalien", len(significant_patterns), len(anomaly_events)) + assert isinstance(significant_patterns, list) + assert isinstance(anomaly_events, list) + + return AnalysisResult(significant_patterns, anomaly_events)