Add gps_sync/src/gps_sync/core.py

This commit is contained in:
Mika 2026-04-26 02:07:52 +00:00
parent 7d45acbda2
commit 27fc424ad2

View file

@ -0,0 +1,84 @@
from __future__ import annotations
import pandas as pd
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
@dataclass
class SyncedDataEntry:
timestamp: str
latitude: float
longitude: float
signal_strength: float
def sync_with_gps(gps_data: List[Dict[str, Any]], wifi_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Synchroniziert WLAN-Messdaten mit GPS-Daten basierend auf Timestamps.
Parameters
----------
gps_data : list[dict]
Liste der GPS-Datensätze mit Schlüsseln: 'timestamp', 'latitude', 'longitude'.
wifi_data : list[dict]
Liste der WLAN-Datensätze mit Schlüsseln: 'timestamp', 'signal_strength'.
Returns
-------
list[dict]
Liste von synchronisierten Datensätzen gemäß SyncedDataEntry.
"""
# Input validation
assert isinstance(gps_data, list), "gps_data must be a list"
assert isinstance(wifi_data, list), "wifi_data must be a list"
def validate_records(records: List[Dict[str, Any]], required_fields: List[str]) -> None:
for idx, rec in enumerate(records):
if not isinstance(rec, dict):
raise TypeError(f"Record at index {idx} must be a dict.")
for field in required_fields:
if field not in rec:
raise ValueError(f"Missing field '{field}' in record at index {idx}.")
validate_records(gps_data, ["timestamp", "latitude", "longitude"])
validate_records(wifi_data, ["timestamp", "signal_strength"])
# Konvertiere in DataFrame
gps_df = pd.DataFrame(gps_data).copy()
wifi_df = pd.DataFrame(wifi_data).copy()
# Konvertiere Zeitstempel in datetime
gps_df["timestamp"] = pd.to_datetime(gps_df["timestamp"], errors="coerce")
wifi_df["timestamp"] = pd.to_datetime(wifi_df["timestamp"], errors="coerce")
gps_df = gps_df.dropna(subset=["timestamp", "latitude", "longitude"])
wifi_df = wifi_df.dropna(subset=["timestamp", "signal_strength"])
# Sortieren
gps_df = gps_df.sort_values("timestamp")
wifi_df = wifi_df.sort_values("timestamp")
# Mergen nächstliegender Timestamp
synced = pd.merge_asof(
wifi_df,
gps_df,
on="timestamp",
direction="nearest",
tolerance=pd.Timedelta(seconds=5)
)
synced = synced.dropna(subset=["latitude", "longitude"])
# Rückgabe als Liste von Dicts nach SyncedDataEntry
result: List[Dict[str, Any]] = []
for _, row in synced.iterrows():
entry = SyncedDataEntry(
timestamp=row["timestamp"].isoformat(),
latitude=float(row["latitude"]),
longitude=float(row["longitude"]),
signal_strength=float(row["signal_strength"])
)
result.append(entry.__dict__)
return result