commit 6721b831375fd3f26f680aa21742f50cc6f0174a Author: Mika Date: Sun Feb 22 03:07:03 2026 +0000 Add logger_script/src/logger_script/core.py diff --git a/logger_script/src/logger_script/core.py b/logger_script/src/logger_script/core.py new file mode 100644 index 0000000..da33858 --- /dev/null +++ b/logger_script/src/logger_script/core.py @@ -0,0 +1,85 @@ +import json +import time +import random +from datetime import datetime +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import List, Dict + + +@dataclass +class LogEntry: + timestamp: str + intensity: float + + +_BUFFER: List[LogEntry] = [] +_OUTPUT_PATH = Path("output/logs.json") + + +def _validate_intensity(value: float) -> float: + if not isinstance(value, (int, float)): + raise TypeError(f"Intensity must be a number, got {type(value)}") + return float(value) + + +def start_logging(duration: int) -> List[Dict[str, float]]: + """Startet die Datenerfassung vom USB-Spektrometer. + + In dieser Simulation werden Zufallswerte erzeugt, um reale Messungen zu imitieren. + """ + if not isinstance(duration, int) or duration <= 0: + raise ValueError("Duration must be a positive integer.") + + global _BUFFER + _BUFFER.clear() + start_time = time.time() + + while time.time() - start_time < duration: + intensity = random.uniform(0.0, 1000.0) # Simulated sensor value + intensity = _validate_intensity(intensity) + entry = LogEntry(timestamp=datetime.utcnow().isoformat(), intensity=intensity) + _BUFFER.append(entry) + time.sleep(1) + + result = [asdict(e) for e in _BUFFER] + return result + + +def flush_buffer() -> None: + """Schreibt gepufferte Daten sicher auf die Festplatte und leert den Zwischenspeicher.""" + global _BUFFER + if not _BUFFER: + return + + _OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) + existing_data = [] + + if _OUTPUT_PATH.exists(): + try: + with open(_OUTPUT_PATH, 'r', encoding='utf-8') as f: + existing_data = json.load(f) + except (json.JSONDecodeError, OSError): + existing_data = [] + + with open(_OUTPUT_PATH, 'w', encoding='utf-8') as f: + json.dump(existing_data + [asdict(e) for e in _BUFFER], f, indent=2) + + _BUFFER.clear() + + +def moving_avg(data: List[float], window: int) -> List[float]: + """Berechnet den gleitenden Mittelwert über eine Datenreihe.""" + if not isinstance(data, list) or not all(isinstance(x, (int, float)) for x in data): + raise TypeError("Data must be a list of numeric values.") + if not isinstance(window, int) or window <= 0: + raise ValueError("Window must be a positive integer.") + if window > len(data): + raise ValueError("Window size cannot exceed data length.") + + smoothed = [] + for i in range(len(data) - window + 1): + segment = data[i:i + window] + avg_value = sum(segment) / window + smoothed.append(avg_value) + return smoothed