From 58790fbfd10fad7af4e6f97bb7f7a21c253548ad Mon Sep 17 00:00:00 2001 From: Mika Date: Thu, 5 Mar 2026 15:48:03 +0000 Subject: [PATCH] Add data_logging/src/data_logging/core.py --- data_logging/src/data_logging/core.py | 75 +++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 data_logging/src/data_logging/core.py diff --git a/data_logging/src/data_logging/core.py b/data_logging/src/data_logging/core.py new file mode 100644 index 0000000..ef70849 --- /dev/null +++ b/data_logging/src/data_logging/core.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import csv +import os +from datetime import datetime +from pathlib import Path +from typing import List +from pydantic import BaseModel, ValidationError, validator + + +class LogEntry(BaseModel): + """Repräsentiert einen einzelnen Logeintrag für ein Δt<0-Ereignis.""" + + timestamp: datetime + value: float + status: str + + @validator("status") + def validate_status(cls, v: str) -> str: + if not v: + raise ValueError("status darf nicht leer sein") + return v + + def to_dict(self) -> dict: + return { + "timestamp": self.timestamp.isoformat(), + "value": self.value, + "status": self.status, + } + + +def log_data(data: List[LogEntry], filename: str) -> bool: + """Protokolliert eine Liste von Datenereignissen in eine CSV-Datei.""" + assert isinstance(filename, str), "filename muss ein String sein" + assert isinstance(data, list), "data muss eine Liste sein" + + path = Path(filename) + try: + # Sicherstellen, dass die Zieldirectory existiert + if not path.parent.exists(): + path.parent.mkdir(parents=True, exist_ok=True) + + # Validierung jedes Elements + valid_entries = [] + for entry in data: + if isinstance(entry, dict): + try: + entry_obj = LogEntry(**entry) + except ValidationError as e: + continue # ungültigen Eintrag überspringen + valid_entries.append(entry_obj) + elif isinstance(entry, LogEntry): + valid_entries.append(entry) + else: + continue + + if not valid_entries: + return False + + file_exists = path.exists() + + with open(path, mode="a", newline="", encoding="utf-8") as csvfile: + fieldnames = ["timestamp", "value", "status"] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + if not file_exists: + writer.writeheader() + + for entry in valid_entries: + writer.writerow(entry.to_dict()) + + return True + + except (OSError, IOError): + return False