Add data_logging/src/data_logging/cli.py
This commit is contained in:
parent
0f973457a4
commit
4e14edeb0a
1 changed files with 67 additions and 0 deletions
67
data_logging/src/data_logging/cli.py
Normal file
67
data_logging/src/data_logging/cli.py
Normal file
|
|
@ -0,0 +1,67 @@
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
from data_logging import core
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_log_entry(entry: Dict[str, Any]) -> bool:
|
||||||
|
"""Validate a single log entry structure."""
|
||||||
|
required_fields = {"timestamp": str, "frequency": (float, int), "signal_strength": (float, int)}
|
||||||
|
for field, expected_type in required_fields.items():
|
||||||
|
if field not in entry:
|
||||||
|
return False
|
||||||
|
if not isinstance(entry[field], expected_type):
|
||||||
|
return False
|
||||||
|
# Further timestamp validation
|
||||||
|
try:
|
||||||
|
datetime.fromisoformat(entry["timestamp"])
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _load_stdin_data() -> list[Dict[str, Any]]:
|
||||||
|
"""Loads JSON data from standard input and validates entries."""
|
||||||
|
try:
|
||||||
|
data = json.load(sys.stdin)
|
||||||
|
except json.JSONDecodeError as exc:
|
||||||
|
raise SystemExit(f"Invalid JSON input: {exc}")
|
||||||
|
|
||||||
|
if not isinstance(data, list):
|
||||||
|
raise SystemExit("Input JSON must be a list of measurement entries.")
|
||||||
|
|
||||||
|
for idx, entry in enumerate(data):
|
||||||
|
if not _validate_log_entry(entry):
|
||||||
|
raise SystemExit(f"Invalid entry at index {idx}: {entry}")
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""CLI entrypoint for WLAN data logging."""
|
||||||
|
parser = argparse.ArgumentParser(description="WLAN Signal Logging CLI")
|
||||||
|
parser.add_argument(
|
||||||
|
"--output",
|
||||||
|
required=True,
|
||||||
|
help="Pfad der JSON-Datei, in die die Messungen geschrieben werden",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
output_path = Path(args.output)
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
measurements = _load_stdin_data()
|
||||||
|
|
||||||
|
for entry in measurements:
|
||||||
|
core.log_wifi_data(
|
||||||
|
timestamp=entry["timestamp"],
|
||||||
|
frequency=float(entry["frequency"]),
|
||||||
|
signal_strength=float(entry["signal_strength"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in a new issue