Add sensor_logging/src/sensor_logging/core.py

This commit is contained in:
Mika 2026-03-01 03:11:37 +00:00
commit 28825647e0

View file

@ -0,0 +1,73 @@
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Any
try:
import smbus2
import bme280
except ImportError:
smbus2 = None # type: ignore
bme280 = None # type: ignore
class SensorData:
"""Repräsentiert einen Datensatz einer BME280-Sensormessung."""
def __init__(self, sensor_id: str, temperature: float, humidity: float, timestamp: datetime | None = None) -> None:
self.sensor_id = sensor_id
self.temperature = temperature
self.humidity = humidity
self.timestamp = timestamp or datetime.utcnow()
self._validate()
def _validate(self) -> None:
if not isinstance(self.sensor_id, str) or not self.sensor_id.strip():
raise ValueError("sensor_id muss ein nicht-leerer String sein.")
if not isinstance(self.temperature, (float, int)):
raise ValueError("temperature muss eine Zahl sein.")
if not isinstance(self.humidity, (float, int)) or not (0.0 <= self.humidity <= 100.0):
raise ValueError("humidity muss eine Zahl zwischen 0 und 100 sein.")
def to_json(self) -> str:
"""Serialisiert Sensordaten als JSON-String."""
return json.dumps(
{
"sensor_id": self.sensor_id,
"temperature": float(self.temperature),
"humidity": float(self.humidity),
"timestamp": self.timestamp.isoformat(),
},
ensure_ascii=False,
)
def log_sensor_data(sensor_id: str, temperature: float, humidity: float) -> None:
"""Liest eine Messung eines BME280-Sensors aus und schreibt sie als JSON-Eintrag in eine Logdatei."""
sensor_data = SensorData(sensor_id, temperature, humidity)
output_dir = Path("output")
output_dir.mkdir(parents=True, exist_ok=True)
log_file = output_dir / "sensor_log.json"
entry = json.loads(sensor_data.to_json())
# Append to log file (create if missing)
if not log_file.exists():
with open(log_file, 'w', encoding='utf-8') as f:
json.dump([entry], f, ensure_ascii=False, indent=2)
else:
with open(log_file, 'r+', encoding='utf-8') as f:
try:
data: Any = json.load(f)
if not isinstance(data, list):
data = []
except json.JSONDecodeError:
data = []
data.append(entry)
f.seek(0)
json.dump(data, f, ensure_ascii=False, indent=2)
f.truncate()
os.sync()