Add data_logging/src/data_logging/core.py

This commit is contained in:
Mika 2026-05-03 02:07:39 +00:00
parent 080dfb34a9
commit ab14a72124

View file

@ -0,0 +1,64 @@
from __future__ import annotations
import pandas as pd
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import List
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')
@dataclass
class LogEntry:
"""Repräsentiert eine einzelne Sensordatenerfassung."""
timestamp: str
water_level: float
ground_vibration: float
ai_label: str
def __post_init__(self):
# Eingabevalidierung entsprechend der Definition
if not isinstance(self.timestamp, str):
raise TypeError("timestamp muss ein String sein (UTC-Zeitstempel).")
if not isinstance(self.water_level, (int, float)):
raise TypeError("water_level muss numerisch sein.")
if not isinstance(self.ground_vibration, (int, float)):
raise TypeError("ground_vibration muss numerisch sein.")
if not isinstance(self.ai_label, str):
raise TypeError("ai_label muss ein String sein.")
def export_to_csv(data: List[LogEntry]) -> str:
"""Exportiert gesammelte Logdaten in eine CSV-Datei.
Args:
data: Liste von LogEntry-Strukturen, die Umweltsensordaten enthalten.
Returns:
Der Pfad zur exportierten CSV-Datei als String.
"""
if not data:
raise ValueError("Datenliste darf nicht leer sein.")
if not all(isinstance(entry, LogEntry) for entry in data):
raise TypeError("Alle Einträge müssen vom Typ LogEntry sein.")
logger.info("Beginne CSV-Export von %d Einträgen", len(data))
df = pd.DataFrame([asdict(entry) for entry in data])
timestamp_str = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
output_dir = Path('output')
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f'environment_log_{timestamp_str}.csv'
df.to_csv(output_path, index=False)
logger.info("Daten erfolgreich in %s exportiert", output_path)
assert output_path.is_file(), "Exportierte CSV-Datei wurde nicht erzeugt."
return str(output_path)