Add data_logging/src/data_logging/core.py
This commit is contained in:
parent
ab2abee274
commit
58790fbfd1
1 changed files with 75 additions and 0 deletions
75
data_logging/src/data_logging/core.py
Normal file
75
data_logging/src/data_logging/core.py
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import os
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from pydantic import BaseModel, ValidationError, validator
|
||||
|
||||
|
||||
class LogEntry(BaseModel):
|
||||
"""Repräsentiert einen einzelnen Logeintrag für ein Δt<0-Ereignis."""
|
||||
|
||||
timestamp: datetime
|
||||
value: float
|
||||
status: str
|
||||
|
||||
@validator("status")
|
||||
def validate_status(cls, v: str) -> str:
|
||||
if not v:
|
||||
raise ValueError("status darf nicht leer sein")
|
||||
return v
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"value": self.value,
|
||||
"status": self.status,
|
||||
}
|
||||
|
||||
|
||||
def log_data(data: List[LogEntry], filename: str) -> bool:
|
||||
"""Protokolliert eine Liste von Datenereignissen in eine CSV-Datei."""
|
||||
assert isinstance(filename, str), "filename muss ein String sein"
|
||||
assert isinstance(data, list), "data muss eine Liste sein"
|
||||
|
||||
path = Path(filename)
|
||||
try:
|
||||
# Sicherstellen, dass die Zieldirectory existiert
|
||||
if not path.parent.exists():
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Validierung jedes Elements
|
||||
valid_entries = []
|
||||
for entry in data:
|
||||
if isinstance(entry, dict):
|
||||
try:
|
||||
entry_obj = LogEntry(**entry)
|
||||
except ValidationError as e:
|
||||
continue # ungültigen Eintrag überspringen
|
||||
valid_entries.append(entry_obj)
|
||||
elif isinstance(entry, LogEntry):
|
||||
valid_entries.append(entry)
|
||||
else:
|
||||
continue
|
||||
|
||||
if not valid_entries:
|
||||
return False
|
||||
|
||||
file_exists = path.exists()
|
||||
|
||||
with open(path, mode="a", newline="", encoding="utf-8") as csvfile:
|
||||
fieldnames = ["timestamp", "value", "status"]
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
|
||||
if not file_exists:
|
||||
writer.writeheader()
|
||||
|
||||
for entry in valid_entries:
|
||||
writer.writerow(entry.to_dict())
|
||||
|
||||
return True
|
||||
|
||||
except (OSError, IOError):
|
||||
return False
|
||||
Loading…
Reference in a new issue