Add logger_script/src/logger_script/core.py

This commit is contained in:
Mika 2026-02-22 03:07:03 +00:00
commit 6721b83137

View file

@ -0,0 +1,85 @@
import json
import time
import random
from datetime import datetime
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List, Dict
@dataclass
class LogEntry:
timestamp: str
intensity: float
_BUFFER: List[LogEntry] = []
_OUTPUT_PATH = Path("output/logs.json")
def _validate_intensity(value: float) -> float:
if not isinstance(value, (int, float)):
raise TypeError(f"Intensity must be a number, got {type(value)}")
return float(value)
def start_logging(duration: int) -> List[Dict[str, float]]:
"""Startet die Datenerfassung vom USB-Spektrometer.
In dieser Simulation werden Zufallswerte erzeugt, um reale Messungen zu imitieren.
"""
if not isinstance(duration, int) or duration <= 0:
raise ValueError("Duration must be a positive integer.")
global _BUFFER
_BUFFER.clear()
start_time = time.time()
while time.time() - start_time < duration:
intensity = random.uniform(0.0, 1000.0) # Simulated sensor value
intensity = _validate_intensity(intensity)
entry = LogEntry(timestamp=datetime.utcnow().isoformat(), intensity=intensity)
_BUFFER.append(entry)
time.sleep(1)
result = [asdict(e) for e in _BUFFER]
return result
def flush_buffer() -> None:
"""Schreibt gepufferte Daten sicher auf die Festplatte und leert den Zwischenspeicher."""
global _BUFFER
if not _BUFFER:
return
_OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
existing_data = []
if _OUTPUT_PATH.exists():
try:
with open(_OUTPUT_PATH, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except (json.JSONDecodeError, OSError):
existing_data = []
with open(_OUTPUT_PATH, 'w', encoding='utf-8') as f:
json.dump(existing_data + [asdict(e) for e in _BUFFER], f, indent=2)
_BUFFER.clear()
def moving_avg(data: List[float], window: int) -> List[float]:
"""Berechnet den gleitenden Mittelwert über eine Datenreihe."""
if not isinstance(data, list) or not all(isinstance(x, (int, float)) for x in data):
raise TypeError("Data must be a list of numeric values.")
if not isinstance(window, int) or window <= 0:
raise ValueError("Window must be a positive integer.")
if window > len(data):
raise ValueError("Window size cannot exceed data length.")
smoothed = []
for i in range(len(data) - window + 1):
segment = data[i:i + window]
avg_value = sum(segment) / window
smoothed.append(avg_value)
return smoothed