Add data_logging/src/data_logging/cli.py
This commit is contained in:
parent
d9f5da5762
commit
f5a6edd86c
1 changed files with 64 additions and 0 deletions
64
data_logging/src/data_logging/cli.py
Normal file
64
data_logging/src/data_logging/cli.py
Normal file
|
|
@ -0,0 +1,64 @@
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
from data_logging.core import LogEntry, log_data
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_file_path(path_str: str, mode: str = 'r') -> Path:
|
||||||
|
"""Validates and returns a Path for given input/output file."""
|
||||||
|
path = Path(path_str)
|
||||||
|
if mode == 'r' and not path.exists():
|
||||||
|
sys.exit(f"Input file not found: {path}")
|
||||||
|
if mode == 'w':
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def _load_sensor_data(input_path: Path) -> list[dict[str, Any]]:
|
||||||
|
"""Loads sensor JSON data from file and validates structure minimally."""
|
||||||
|
with input_path.open('r', encoding='utf-8') as f:
|
||||||
|
try:
|
||||||
|
data = json.load(f)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
sys.exit(f"Invalid JSON format in {input_path}: {e}")
|
||||||
|
|
||||||
|
if isinstance(data, dict):
|
||||||
|
data = [data]
|
||||||
|
if not isinstance(data, list):
|
||||||
|
sys.exit("Input JSON must be an object or list of objects.")
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""CLI entry point for Rover Data Logging."""
|
||||||
|
parser = argparse.ArgumentParser(description="Rover Data Logging CLI")
|
||||||
|
parser.add_argument('--input', required=True, help='Pfad oder Datenquelle (JSON)')
|
||||||
|
parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei (CSV/JSON)')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
input_path = _validate_file_path(args.input, 'r')
|
||||||
|
output_path = _validate_file_path(args.output, 'w')
|
||||||
|
|
||||||
|
records = _load_sensor_data(input_path)
|
||||||
|
|
||||||
|
for record in records:
|
||||||
|
try:
|
||||||
|
entry = LogEntry(
|
||||||
|
time=record.get('time', ''),
|
||||||
|
lux=float(record.get('lux', 0.0)),
|
||||||
|
dB=float(record.get('dB', 0.0)),
|
||||||
|
temperature=float(record.get('temperature', 0.0)),
|
||||||
|
inference_score=float(record.get('inference_score', 0.0))
|
||||||
|
)
|
||||||
|
except (TypeError, ValueError) as e:
|
||||||
|
print(f"Skipping invalid record: {record} – Error: {e}", file=sys.stderr)
|
||||||
|
continue
|
||||||
|
log_data(entry, output_path)
|
||||||
|
|
||||||
|
print(f"Logging complete. Data written to: {output_path}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
Loading…
Reference in a new issue