Add data_logger/src/data_logger/core.py
This commit is contained in:
commit
a273c91c59
1 changed files with 59 additions and 0 deletions
59
data_logger/src/data_logger/core.py
Normal file
59
data_logger/src/data_logger/core.py
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
import csv
|
||||||
|
from dataclasses import dataclass, asdict
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SensorData:
|
||||||
|
"""Repräsentiert eine einzelne Sensormessung mit Zeitstempel."""
|
||||||
|
|
||||||
|
timestamp: datetime
|
||||||
|
temperature: float
|
||||||
|
vibration_amplitude: float
|
||||||
|
humidity: float
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
# Input-Validierung zur Sicherstellung von Datentypen und sinnvoller Werte.
|
||||||
|
if not isinstance(self.timestamp, datetime):
|
||||||
|
raise TypeError("timestamp muss ein datetime-Objekt sein.")
|
||||||
|
for field_name in ("temperature", "vibration_amplitude", "humidity"):
|
||||||
|
value = getattr(self, field_name)
|
||||||
|
if not isinstance(value, (float, int)):
|
||||||
|
raise TypeError(f"{field_name} muss numerisch sein.")
|
||||||
|
assert -100.0 <= self.temperature <= 200.0, "Temperatur außerhalb des zulässigen Bereichs."
|
||||||
|
assert 0.0 <= self.humidity <= 100.0, "Luftfeuchtigkeit muss zwischen 0 und 100 liegen."
|
||||||
|
|
||||||
|
|
||||||
|
def log_sensor_data(sensor_values: List[SensorData]) -> str:
|
||||||
|
"""Speichert die übergebenen Sensordaten in einer CSV-Datei.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sensor_values: Liste von SensorData-Objekten, die geschrieben werden sollen.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Pfad zur erzeugten CSV-Datei als String.
|
||||||
|
"""
|
||||||
|
if not isinstance(sensor_values, list) or not all(isinstance(v, SensorData) for v in sensor_values):
|
||||||
|
raise TypeError("sensor_values muss eine Liste von SensorData-Objekten sein.")
|
||||||
|
|
||||||
|
if not sensor_values:
|
||||||
|
raise ValueError("sensor_values darf nicht leer sein.")
|
||||||
|
|
||||||
|
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
output_path = Path(f"output/logged_data_{timestamp_str}.csv")
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
fieldnames = ["timestamp", "temperature", "vibration_amplitude", "humidity"]
|
||||||
|
|
||||||
|
with output_path.open(mode="w", newline="", encoding="utf-8") as csvfile:
|
||||||
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||||
|
writer.writeheader()
|
||||||
|
for entry in sensor_values:
|
||||||
|
record = asdict(entry)
|
||||||
|
record["timestamp"] = entry.timestamp.isoformat()
|
||||||
|
writer.writerow(record)
|
||||||
|
|
||||||
|
return str(output_path)
|
||||||
Loading…
Reference in a new issue