From 8bd7948eacac204bf50818c252765c12c28e1643 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 29 Mar 2026 03:07:26 +0000 Subject: [PATCH] Add logger/src/logger/core.py --- logger/src/logger/core.py | 106 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 logger/src/logger/core.py diff --git a/logger/src/logger/core.py b/logger/src/logger/core.py new file mode 100644 index 0000000..1f60f27 --- /dev/null +++ b/logger/src/logger/core.py @@ -0,0 +1,106 @@ +import json +import time +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List + + +class LogEntry: + """Repräsentiert einen einzelnen Messwert des Sensors und seine Metadaten.""" + + def __init__( + self, + timestamp: datetime, + led_id: str, + lumens: float, + peak_wavelength: float, + latitude: float, + longitude: float, + ) -> None: + if not isinstance(timestamp, datetime): + raise TypeError("timestamp must be a datetime instance") + if not isinstance(led_id, str): + raise TypeError("led_id must be a string") + for val, name in [ + (lumens, "lumens"), + (peak_wavelength, "peak_wavelength"), + (latitude, "latitude"), + (longitude, "longitude"), + ]: + if not isinstance(val, (int, float)): + raise TypeError(f"{name} must be a numeric type") + + self.timestamp = timestamp + self.led_id = led_id + self.lumens = float(lumens) + self.peak_wavelength = float(peak_wavelength) + self.latitude = float(latitude) + self.longitude = float(longitude) + + def to_dict(self) -> Dict[str, Any]: + """Konvertiert den Log-Eintrag in ein Dictionary zur JSON-Serialisierung.""" + return { + "timestamp": self.timestamp.isoformat(), + "led_id": self.led_id, + "lumens": self.lumens, + "peak_wavelength": self.peak_wavelength, + "latitude": self.latitude, + "longitude": self.longitude, + } + + +def _simulate_sensor_reading() -> LogEntry: + """Erzeugt simulierte Sensordaten für Test- und Entwicklungszwecke.""" + from random import uniform, choice + + led_id = choice(["LED_A", "LED_B", "LED_C"]) + lumens = round(uniform(100.0, 1000.0), 2) + peak_wavelength = round(uniform(400.0, 700.0), 1) + latitude = round(uniform(-90.0, 90.0), 6) + longitude = round(uniform(-180.0, 180.0), 6) + return LogEntry( + timestamp=datetime.utcnow(), + led_id=led_id, + lumens=lumens, + peak_wavelength=peak_wavelength, + latitude=latitude, + longitude=longitude, + ) + + +def start_logging(interval: float, duration: float) -> str: + """Startet die periodische Aufzeichnung von Sensordaten über einen gegebenen Zeitraum. + + Args: + interval: Zeitintervall zwischen zwei Messungen in Sekunden. + duration: Gesamtzeit der Aufzeichnung in Sekunden. + + Returns: + Pfad zur erzeugten JSON-Logdatei mit den aufgezeichneten Messungen. + """ + if not isinstance(interval, (int, float)) or interval <= 0: + raise ValueError("interval must be a positive number") + if not isinstance(duration, (int, float)) or duration <= 0: + raise ValueError("duration must be a positive number") + + output_dir = Path("output/logs") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp_str = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + log_path = output_dir / f"session_log_{timestamp_str}.json" + + entries: List[Dict[str, Any]] = [] + + start_time = time.time() + while (time.time() - start_time) < duration: + entry = _simulate_sensor_reading() + entries.append(entry.to_dict()) + time.sleep(interval) + + with open(log_path, "w", encoding="utf-8") as f: + json.dump(entries, f, ensure_ascii=False, indent=2) + + # Validation assertion for CI safety + assert log_path.exists(), "Log file should have been created." + + return str(log_path)