From 27fc424ad20d09b76e5330b4797b6b730bda1da5 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 26 Apr 2026 02:07:52 +0000 Subject: [PATCH] Add gps_sync/src/gps_sync/core.py --- gps_sync/src/gps_sync/core.py | 84 +++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 gps_sync/src/gps_sync/core.py diff --git a/gps_sync/src/gps_sync/core.py b/gps_sync/src/gps_sync/core.py new file mode 100644 index 0000000..fdc3d6a --- /dev/null +++ b/gps_sync/src/gps_sync/core.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import pandas as pd +from dataclasses import dataclass +from datetime import datetime +from typing import List, Dict, Any + + +@dataclass +class SyncedDataEntry: + timestamp: str + latitude: float + longitude: float + signal_strength: float + + +def sync_with_gps(gps_data: List[Dict[str, Any]], wifi_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Synchroniziert WLAN-Messdaten mit GPS-Daten basierend auf Timestamps. + + Parameters + ---------- + gps_data : list[dict] + Liste der GPS-Datensätze mit Schlüsseln: 'timestamp', 'latitude', 'longitude'. + wifi_data : list[dict] + Liste der WLAN-Datensätze mit Schlüsseln: 'timestamp', 'signal_strength'. + + Returns + ------- + list[dict] + Liste von synchronisierten Datensätzen gemäß SyncedDataEntry. + """ + # Input validation + assert isinstance(gps_data, list), "gps_data must be a list" + assert isinstance(wifi_data, list), "wifi_data must be a list" + + def validate_records(records: List[Dict[str, Any]], required_fields: List[str]) -> None: + for idx, rec in enumerate(records): + if not isinstance(rec, dict): + raise TypeError(f"Record at index {idx} must be a dict.") + for field in required_fields: + if field not in rec: + raise ValueError(f"Missing field '{field}' in record at index {idx}.") + + validate_records(gps_data, ["timestamp", "latitude", "longitude"]) + validate_records(wifi_data, ["timestamp", "signal_strength"]) + + # Konvertiere in DataFrame + gps_df = pd.DataFrame(gps_data).copy() + wifi_df = pd.DataFrame(wifi_data).copy() + + # Konvertiere Zeitstempel in datetime + gps_df["timestamp"] = pd.to_datetime(gps_df["timestamp"], errors="coerce") + wifi_df["timestamp"] = pd.to_datetime(wifi_df["timestamp"], errors="coerce") + + gps_df = gps_df.dropna(subset=["timestamp", "latitude", "longitude"]) + wifi_df = wifi_df.dropna(subset=["timestamp", "signal_strength"]) + + # Sortieren + gps_df = gps_df.sort_values("timestamp") + wifi_df = wifi_df.sort_values("timestamp") + + # Mergen – nächstliegender Timestamp + synced = pd.merge_asof( + wifi_df, + gps_df, + on="timestamp", + direction="nearest", + tolerance=pd.Timedelta(seconds=5) + ) + + synced = synced.dropna(subset=["latitude", "longitude"]) + + # Rückgabe als Liste von Dicts nach SyncedDataEntry + result: List[Dict[str, Any]] = [] + for _, row in synced.iterrows(): + entry = SyncedDataEntry( + timestamp=row["timestamp"].isoformat(), + latitude=float(row["latitude"]), + longitude=float(row["longitude"]), + signal_strength=float(row["signal_strength"]) + ) + result.append(entry.__dict__) + + return result