Add magnetometer_data_logger/src/magnetometer_data_logger/main.py

This commit is contained in:
Mika 2026-04-05 02:07:39 +00:00
parent 95783716b4
commit d7d4537408

View file

@ -0,0 +1,103 @@
import json
import os
import argparse
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Any
@dataclass
class SensorData:
"""Repräsentiert einen einzelnen Datensatz einer Magnetometer-Messung."""
timestamp: datetime
Bx: float
By: float
Bz: float
temperature: float
humidity: float
def to_serializable(self) -> dict[str, Any]:
"""Gibt eine JSON-serielle Repräsentation des Datensatzes zurück."""
return {
"timestamp": self.timestamp.isoformat(),
"Bx": self.Bx,
"By": self.By,
"Bz": self.Bz,
"temperature": self.temperature,
"humidity": self.humidity,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> 'SensorData':
"""Validiert und erstellt ein SensorData-Objekt aus einem Dictionary."""
required_fields = {"timestamp", "Bx", "By", "Bz", "temperature", "humidity"}
missing = required_fields - data.keys()
if missing:
raise ValueError(f"Fehlende Felder in SensorData: {', '.join(missing)}")
try:
timestamp = (
datetime.fromisoformat(data["timestamp"])
if isinstance(data["timestamp"], str)
else data["timestamp"]
)
Bx = float(data["Bx"])
By = float(data["By"])
Bz = float(data["Bz"])
temperature = float(data["temperature"])
humidity = float(data["humidity"])
except (ValueError, TypeError) as e:
raise ValueError(f"Ungültige Datentypen in SensorData: {e}") from e
return cls(timestamp=timestamp, Bx=Bx, By=By, Bz=Bz, temperature=temperature, humidity=humidity)
def log_data(sensor_data: SensorData, output_path: str = "output/magnetometer_log.json") -> None:
"""Schreibt Sensordaten in eine JSON-Logdatei."""
assert isinstance(sensor_data, SensorData), "sensor_data muss eine Instanz von SensorData sein."
assert isinstance(output_path, str) and output_path.strip(), "output_path muss ein gültiger Pfad sein."
log_entry = sensor_data.to_serializable()
os.makedirs(os.path.dirname(output_path), exist_ok=True)
existing_data = []
if os.path.exists(output_path):
with open(output_path, 'r', encoding='utf-8') as file:
try:
existing_data = json.load(file)
if not isinstance(existing_data, list):
existing_data = []
except json.JSONDecodeError:
existing_data = []
existing_data.append(log_entry)
with open(output_path, 'w', encoding='utf-8') as file:
json.dump(existing_data, file, indent=2)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Magnetometer Data Logger")
parser.add_argument(
"--output",
required=True,
help="Pfad zur Ausgabedatei, in der die Magnetometerdaten gespeichert werden.",
)
return parser.parse_args()
def main() -> None:
args = _parse_args()
try:
input_json = json.load(os.sys.stdin)
sensor_data = SensorData.from_dict(input_json)
log_data(sensor_data, args.output)
except Exception as exc:
os.sys.stderr.write(f"Fehler beim Verarbeiten der Sensordaten: {exc}\n")
os.sys.exit(1)
if __name__ == "__main__":
main()