Add data_analysis/src/data_analysis/core.py

This commit is contained in:
Mika 2026-03-08 03:07:06 +00:00
parent e95a204761
commit 6b0119f042

View file

@ -0,0 +1,98 @@
from __future__ import annotations
import logging
from typing import List, Any
from dataclasses import dataclass, field
import pandas as pd
import numpy as np
from statistics import mean, stdev
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@dataclass
class AnalysisResult:
"""Struktur zur Speicherung der Ergebnisse der Sensordatenanalyse."""
significant_patterns: list[Any] = field(default_factory=list)
anomaly_events: list[Any] = field(default_factory=list)
def __init__(self, significant_patterns: list[Any], anomaly_events: list[Any]) -> None:
self.significant_patterns = significant_patterns
self.anomaly_events = anomaly_events
class DataValidationError(Exception):
"""Wird ausgelöst, wenn Eingabedaten ungültig sind."""
@dataclass
class LogEntry:
timestamp: str
luminosity: int
sound_level: float
temperature: float
inference: float
def _validate_log_entries(log_entries: List[LogEntry]) -> None:
if not isinstance(log_entries, list):
raise DataValidationError("log_entries muss eine Liste sein.")
for entry in log_entries:
if not isinstance(entry, LogEntry):
raise DataValidationError("Eintrag ist kein LogEntry-Objekt.")
if not isinstance(entry.luminosity, int):
raise DataValidationError("Luminosity muss int sein.")
if not isinstance(entry.sound_level, (int, float)):
raise DataValidationError("Sound-Level muss numerisch sein.")
if not isinstance(entry.temperature, (int, float)):
raise DataValidationError("Temperature muss numerisch sein.")
if not isinstance(entry.inference, (int, float)):
raise DataValidationError("Inference muss numerisch sein.")
def analyze_data(log_entries: List[LogEntry]) -> AnalysisResult:
"""Analysiert eine Liste von Rover-LogEinträgen und erkennt Muster sowie Anomalien."""
_validate_log_entries(log_entries)
if not log_entries:
logger.warning("Leere Eingabeliste übergeben.")
return AnalysisResult([], [])
data = {
"luminosity": [entry.luminosity for entry in log_entries],
"sound": [entry.sound_level for entry in log_entries],
"temperature": [entry.temperature for entry in log_entries],
"inference": [entry.inference for entry in log_entries],
}
df = pd.DataFrame(data)
significant_patterns = []
anomaly_events = []
# Erkennung signifikanter Korrelationen / Muster
corr = df.corr(numeric_only=True)
for col1 in corr.columns:
for col2 in corr.columns:
if col1 != col2 and abs(corr.loc[col1, col2]) > 0.8:
pattern = {"relationship": f"High correlation between {col1} and {col2}", "correlation": corr.loc[col1, col2]}
significant_patterns.append(pattern)
# Erkennung von Anomalien basierend auf 3*Standardabweichung
for column in ["luminosity", "sound", "temperature", "inference"]:
series = df[column]
if len(series) < 2:
continue
mean_value = mean(series)
std_value = stdev(series)
lower_bound = mean_value - 3 * std_value
upper_bound = mean_value + 3 * std_value
anomalies = df[(series < lower_bound) | (series > upper_bound)]
for idx, row in anomalies.iterrows():
event = {"index": int(idx), "parameter": column, "value": row[column]}
anomaly_events.append(event)
logger.info("Analyse abgeschlossen: %d Muster, %d Anomalien", len(significant_patterns), len(anomaly_events))
assert isinstance(significant_patterns, list)
assert isinstance(anomaly_events, list)
return AnalysisResult(significant_patterns, anomaly_events)