From 28825647e0e6bbe6f147b392e00f928819c2bde7 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 1 Mar 2026 03:11:37 +0000 Subject: [PATCH] Add sensor_logging/src/sensor_logging/core.py --- sensor_logging/src/sensor_logging/core.py | 73 +++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 sensor_logging/src/sensor_logging/core.py diff --git a/sensor_logging/src/sensor_logging/core.py b/sensor_logging/src/sensor_logging/core.py new file mode 100644 index 0000000..8b00748 --- /dev/null +++ b/sensor_logging/src/sensor_logging/core.py @@ -0,0 +1,73 @@ +import json +import os +from datetime import datetime +from pathlib import Path +from typing import Any + +try: + import smbus2 + import bme280 +except ImportError: + smbus2 = None # type: ignore + bme280 = None # type: ignore + + +class SensorData: + """Repräsentiert einen Datensatz einer BME280-Sensormessung.""" + + def __init__(self, sensor_id: str, temperature: float, humidity: float, timestamp: datetime | None = None) -> None: + self.sensor_id = sensor_id + self.temperature = temperature + self.humidity = humidity + self.timestamp = timestamp or datetime.utcnow() + self._validate() + + def _validate(self) -> None: + if not isinstance(self.sensor_id, str) or not self.sensor_id.strip(): + raise ValueError("sensor_id muss ein nicht-leerer String sein.") + if not isinstance(self.temperature, (float, int)): + raise ValueError("temperature muss eine Zahl sein.") + if not isinstance(self.humidity, (float, int)) or not (0.0 <= self.humidity <= 100.0): + raise ValueError("humidity muss eine Zahl zwischen 0 und 100 sein.") + + def to_json(self) -> str: + """Serialisiert Sensordaten als JSON-String.""" + return json.dumps( + { + "sensor_id": self.sensor_id, + "temperature": float(self.temperature), + "humidity": float(self.humidity), + "timestamp": self.timestamp.isoformat(), + }, + ensure_ascii=False, + ) + + +def log_sensor_data(sensor_id: str, temperature: float, humidity: float) -> None: + """Liest eine Messung eines BME280-Sensors aus und schreibt sie als JSON-Eintrag in eine Logdatei.""" + sensor_data = SensorData(sensor_id, temperature, humidity) + + output_dir = Path("output") + output_dir.mkdir(parents=True, exist_ok=True) + log_file = output_dir / "sensor_log.json" + + entry = json.loads(sensor_data.to_json()) + + # Append to log file (create if missing) + if not log_file.exists(): + with open(log_file, 'w', encoding='utf-8') as f: + json.dump([entry], f, ensure_ascii=False, indent=2) + else: + with open(log_file, 'r+', encoding='utf-8') as f: + try: + data: Any = json.load(f) + if not isinstance(data, list): + data = [] + except json.JSONDecodeError: + data = [] + data.append(entry) + f.seek(0) + json.dump(data, f, ensure_ascii=False, indent=2) + f.truncate() + + os.sync() \ No newline at end of file