Add gps_sync/src/gps_sync/cli.py
This commit is contained in:
parent
27fc424ad2
commit
eccc0d4b5f
1 changed files with 58 additions and 0 deletions
58
gps_sync/src/gps_sync/cli.py
Normal file
58
gps_sync/src/gps_sync/cli.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd # required per dependencies
|
||||
|
||||
from gps_sync.core import sync_with_gps
|
||||
|
||||
|
||||
def _load_json(path: Path) -> list[dict[str, Any]]:
|
||||
with path.open('r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError(f"Expected list of dict for JSON file: {path}")
|
||||
return data
|
||||
|
||||
|
||||
def _write_json(data: list[dict[str, Any]], path: Path) -> None:
|
||||
with path.open('w', encoding='utf-8') as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Synchronisiert WLAN-Messdaten mit GPS-Positionsdaten anhand von Zeitstempeln."
|
||||
)
|
||||
parser.add_argument('--gps', required=True, help='Pfad zur GPS-JSON-Datei')
|
||||
parser.add_argument('--wifi', required=True, help='Pfad zur WLAN-JSON-Datei')
|
||||
parser.add_argument('--out', required=True, help='Pfad zur Ausgabe-JSON-Datei mit synchronisierten Daten')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
gps_path = Path(args.gps)
|
||||
wifi_path = Path(args.wifi)
|
||||
out_path = Path(args.out)
|
||||
|
||||
assert gps_path.exists(), f"GPS-Datei nicht gefunden: {gps_path}"
|
||||
assert wifi_path.exists(), f"WLAN-Datei nicht gefunden: {wifi_path}"
|
||||
|
||||
gps_data = _load_json(gps_path)
|
||||
wifi_data = _load_json(wifi_path)
|
||||
|
||||
synced_data = sync_with_gps(gps_data, wifi_data)
|
||||
|
||||
if not isinstance(synced_data, list):
|
||||
raise TypeError("sync_with_gps muss eine Liste von Dictionaries zurückgeben.")
|
||||
|
||||
# Validate fields based on data model SyncedDataEntry
|
||||
for entry in synced_data:
|
||||
if not all(k in entry for k in ("timestamp", "latitude", "longitude", "signal_strength")):
|
||||
raise ValueError("Ungültiger Dateneintrag im synchronisierten Ergebnis.")
|
||||
|
||||
_write_json(synced_data, out_path)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Reference in a new issue