Add data_analysis/src/data_analysis/main.py

This commit is contained in:
Mika 2026-03-01 03:11:39 +00:00
parent cb98748521
commit a7e12f37c2

View file

@ -0,0 +1,80 @@
from __future__ import annotations
import json
from dataclasses import dataclass, asdict
from typing import List, Union
import pandas as pd
from statistics import mean
from datetime import datetime
@dataclass
class AnalysisResult:
"""Speichert Ergebnisse der Kondensationsanalyse."""
average_diameter: float
condensation_rates: List[float]
def to_json(self) -> str:
"""Serialisiert das Analyseergebnis als JSON-String."""
return json.dumps(asdict(self), ensure_ascii=False)
def analyze_condensation(data: Union[pd.DataFrame, list[dict]]) -> AnalysisResult:
"""Analysiert Kondensationsdaten und berechnet Durchschnittsdurchmesser und Raten.
Args:
data: Messdaten als pandas.DataFrame oder Liste von Dicts
Returns:
AnalysisResult: Aggregierte Analyseergebnisse
"""
# Validierung der Eingabe
if isinstance(data, list):
if not data:
raise ValueError("Leere Datenliste wurde übergeben.")
df = pd.DataFrame(data)
elif isinstance(data, pd.DataFrame):
df = data.copy()
else:
raise TypeError("Daten müssen entweder ein pandas.DataFrame oder eine Liste von Dicts sein.")
required_columns = {"time", "surface_type", "average_diameter", "minutes_since_start"}
if not required_columns.issubset(df.columns):
raise ValueError(f"Fehlende erforderliche Spalten: {required_columns - set(df.columns)}")
# Datentypprüfung für kritische Spalten
try:
df['average_diameter'] = df['average_diameter'].astype(float)
df['minutes_since_start'] = df['minutes_since_start'].astype(int)
except Exception as e:
raise ValueError(f"Ungültige Datentypen in den Eingabedaten: {e}") from e
# Umwandlung von Zeitwerten (optional, falls String)
if not pd.api.types.is_datetime64_any_dtype(df['time']):
try:
df['time'] = pd.to_datetime(df['time'])
except Exception as e:
raise ValueError(f"Ungültige Zeitwerte in Spalte 'time': {e}") from e
# Durchschnittlicher Tropfendurchmesser über alles
avg_diameter = df['average_diameter'].mean()
# Berechnung der Kondensationsraten pro Oberfläche
condensation_rates: List[float] = []
for surface, group in df.groupby('surface_type'):
group = group.sort_values('minutes_since_start')
if len(group) < 2:
condensation_rates.append(0.0)
continue
t_diff = group['minutes_since_start'].iloc[-1] - group['minutes_since_start'].iloc[0]
d_diff = group['average_diameter'].iloc[-1] - group['average_diameter'].iloc[0]
rate = d_diff / t_diff if t_diff != 0 else 0.0
condensation_rates.append(rate)
result = AnalysisResult(average_diameter=avg_diameter, condensation_rates=condensation_rates)
# CI-Prüfungen / Assertions
assert isinstance(result.average_diameter, float), "average_diameter muss float sein."
assert all(isinstance(r, float) for r in result.condensation_rates), "Alle condensation_rates müssen float sein."
return result