Add data_logger/src/data_logger/cli.py

This commit is contained in:
Mika 2026-02-08 03:06:29 +00:00
parent a273c91c59
commit 3fa3637d98

View file

@ -0,0 +1,70 @@
import argparse
import json
from pathlib import Path
from datetime import datetime
from typing import List
from data_logger.core import log_sensor_data, SensorData
def _load_sensor_data(json_path: Path) -> List[SensorData]:
if not json_path.exists():
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {json_path}")
with json_path.open('r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("JSON muss eine Liste von SensorData-Objekten enthalten.")
sensor_values: List[SensorData] = []
for entry in data:
try:
ts_raw = entry.get('timestamp')
if ts_raw is None:
raise ValueError('timestamp fehlt')
if isinstance(ts_raw, str):
timestamp = datetime.fromisoformat(ts_raw)
else:
raise TypeError('timestamp muss ISO-String sein')
temperature = float(entry['temperature'])
vibration_amplitude = float(entry['vibration_amplitude'])
humidity = float(entry['humidity'])
sensor_values.append(SensorData(
timestamp=timestamp,
temperature=temperature,
vibration_amplitude=vibration_amplitude,
humidity=humidity,
))
except (KeyError, TypeError, ValueError) as e:
raise ValueError(f'Ungültiges Sensordaten-Objekt: {entry}') from e
return sensor_values
def main() -> None:
parser = argparse.ArgumentParser(description='CLI für den Data Logger.')
parser.add_argument('--input', required=True, help='Pfad zur Eingabe-JSON-Datei mit Sensordaten.')
parser.add_argument('--output', required=True, help='Pfad, unter dem die CSV-Datei gespeichert wird.')
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
sensor_values = _load_sensor_data(input_path)
# CSV-Datei erstellen
csv_path = log_sensor_data(sensor_values)
# Ausgabe-CSV an Zielort verschieben, falls nötig
if Path(csv_path) != output_path:
Path(csv_path).replace(output_path)
print(f'Daten erfolgreich geloggt: {output_path}')
if __name__ == '__main__':
main()