commit d2268f659c880544ee5ca523f454cd348bb6f6a7 Author: Mika Date: Sun Feb 15 03:06:25 2026 +0000 Add sensor_logging/src/sensor_logging/main.py diff --git a/sensor_logging/src/sensor_logging/main.py b/sensor_logging/src/sensor_logging/main.py new file mode 100644 index 0000000..2b39fd9 --- /dev/null +++ b/sensor_logging/src/sensor_logging/main.py @@ -0,0 +1,95 @@ +import json +import argparse +import os +from datetime import datetime +from pathlib import Path +from typing import Any, Dict + + +class SensorData: + """Repräsentiert einen Satz gemessener Sensordaten inklusive Zeitstempel.""" + + def __init__(self, voltage: float, temperature: float, humidity: float) -> None: + if not isinstance(voltage, (int, float)): + raise TypeError("voltage muss float sein") + if not isinstance(temperature, (int, float)): + raise TypeError("temperature muss float sein") + if not isinstance(humidity, (int, float)): + raise TypeError("humidity muss float sein") + + self.timestamp = datetime.now().isoformat() + self.voltage = float(voltage) + self.temperature = float(temperature) + self.humidity = float(humidity) + + def to_json(self) -> Dict[str, Any]: + """Serialisiert Sensordaten in ein JSON-kompatibles Dictionary.""" + return { + "timestamp": self.timestamp, + "voltage": self.voltage, + "temperature": self.temperature, + "humidity": self.humidity, + } + + +def log_sensor_data(voltage: float, temperature: float, humidity: float) -> None: + """Schreibt Sensordaten (Spannung, Temperatur, Luftfeuchte) mit Zeitstempel in eine Logdatei im JSON-Format.""" + data = SensorData(voltage=voltage, temperature=temperature, humidity=humidity) + log_entry = data.to_json() + + parser = argparse.ArgumentParser(description="Logge Sensordaten in eine JSON-Datei.") + parser.add_argument( + "--output", + required=False, + default="output/sensor_log.json", + help="Pfad zur Ausgabedatei für Sensordaten-Logs.", + ) + args, _ = parser.parse_known_args() + + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + + logs = [] + if output_path.exists(): + try: + with open(output_path, "r", encoding="utf-8") as f: + logs = json.load(f) + if not isinstance(logs, list): + logs = [] + except (json.JSONDecodeError, FileNotFoundError): + logs = [] + + logs.append(log_entry) + + with open(output_path, "w", encoding="utf-8") as f: + json.dump(logs, f, ensure_ascii=False, indent=2) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Erfasse und logge Sensordaten.") + parser.add_argument("--output", required=False, default="output/sensor_log.json", help="Pfad zur Logdatei.") + args = parser.parse_args() + + # Hier könnte reale Sensorabfrage erfolgen (Adafruit Bibliotheken), placeholder für echte Integration + try: + import Adafruit_DHT + import adafruit_ads1x15.ads1115 as ADS + from adafruit_ads1x15.analog_in import AnalogIn + from busio import I2C + import board + + DHT_SENSOR = Adafruit_DHT.DHT22 + DHT_PIN = 4 + humidity, temperature = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN) + i2c = I2C(board.SCL, board.SDA) + ads = ADS.ADS1115(i2c) + chan = AnalogIn(ads, ADS.P0) + voltage = chan.voltage * 1000.0 # in mV + + if None in (humidity, temperature, voltage): + raise ValueError("Ungültige Sensordaten erfasst") + + except Exception as e: + raise RuntimeError(f"Sensorabfrage fehlgeschlagen: {e}") from e + + log_sensor_data(voltage=voltage, temperature=temperature, humidity=humidity) \ No newline at end of file