Add python_sensor_logger/src/python_sensor_logger/core.py

This commit is contained in:
Mika 2026-04-12 02:07:07 +00:00
commit d385a4c602

View file

@ -0,0 +1,87 @@
import time
import json
import os
from datetime import datetime
from pathlib import Path
from typing import TypedDict, Any
import random
class TemperatureData(TypedDict):
"""Data model representing a temperature measurement."""
timestamp: float
temperature: float
class SensorReadError(Exception):
"""Raised when sensor reading fails or returns invalid data."""
pass
def read_temperature() -> float:
"""Liest die aktuelle Temperatur vom Sensor und gibt sie in Grad Celsius zurück.
Diese Implementierung simuliert den Sensorwert, da keine spezifische
Sensorschnittstelle in den Abhängigkeiten enthalten ist.
Raises:
SensorReadError: Wenn der Sensorwert ungültig ist oder nicht gelesen werden kann.
Returns:
float: Aktuelle Temperatur in Grad Celsius.
"""
try:
# Simulate sensor reading (e.g., DS18B20). In real usage, a sensor library would be used.
temperature = round(random.uniform(-10.0, 40.0), 2)
if not isinstance(temperature, float):
raise SensorReadError("Invalid sensor output type.")
if temperature < -100 or temperature > 150:
raise SensorReadError("Sensor reading out of plausible range.")
return temperature
except Exception as exc:
raise SensorReadError(f"Failed to read temperature: {exc}") from exc
def log_temperature(timestamp: float, temperature: float) -> None:
"""Schreibt eine neue Temperaturmessung mit Zeitstempel in eine JSON-Datei.
Wenn die Datei existiert, wird sie erweitert; andernfalls wird sie neu erstellt.
Args:
timestamp (float): UNIX-Zeitstempel der Messung.
temperature (float): Gemessene Temperatur in Grad Celsius.
"""
# Eingabevalidierung
if not isinstance(timestamp, (float, int)):
raise ValueError("timestamp muss vom Typ float oder int sein")
if not isinstance(temperature, (float, int)):
raise ValueError("temperature muss vom Typ float oder int sein")
data_entry: TemperatureData = {
"timestamp": float(timestamp),
"temperature": float(temperature)
}
output_path = Path("output/temperature_log.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
# JSON-Datei lesen (wenn vorhanden)
records: list[dict[str, Any]] = []
if output_path.exists():
try:
with output_path.open("r", encoding="utf-8") as f:
records = json.load(f)
if not isinstance(records, list):
records = []
except json.JSONDecodeError:
records = []
# Neuen Eintrag hinzufügen
records.append(data_entry)
# JSON-Datei aktualisieren
with output_path.open("w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
# Zusätzliche Integritätsprüfung (CI-ready)
assert any(isinstance(r.get("temperature"), float) for r in records), "Ungültige Datenstruktur in Logdatei."