From f5a6edd86c7319e533577470f67f225a13431df4 Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 8 Mar 2026 03:07:04 +0000 Subject: [PATCH] Add data_logging/src/data_logging/cli.py --- data_logging/src/data_logging/cli.py | 64 ++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 data_logging/src/data_logging/cli.py diff --git a/data_logging/src/data_logging/cli.py b/data_logging/src/data_logging/cli.py new file mode 100644 index 0000000..f73ad5f --- /dev/null +++ b/data_logging/src/data_logging/cli.py @@ -0,0 +1,64 @@ +import argparse +import json +import sys +from pathlib import Path +from typing import Any +from data_logging.core import LogEntry, log_data + + +def _validate_file_path(path_str: str, mode: str = 'r') -> Path: + """Validates and returns a Path for given input/output file.""" + path = Path(path_str) + if mode == 'r' and not path.exists(): + sys.exit(f"Input file not found: {path}") + if mode == 'w': + path.parent.mkdir(parents=True, exist_ok=True) + return path + + +def _load_sensor_data(input_path: Path) -> list[dict[str, Any]]: + """Loads sensor JSON data from file and validates structure minimally.""" + with input_path.open('r', encoding='utf-8') as f: + try: + data = json.load(f) + except json.JSONDecodeError as e: + sys.exit(f"Invalid JSON format in {input_path}: {e}") + + if isinstance(data, dict): + data = [data] + if not isinstance(data, list): + sys.exit("Input JSON must be an object or list of objects.") + return data + + +def main() -> None: + """CLI entry point for Rover Data Logging.""" + parser = argparse.ArgumentParser(description="Rover Data Logging CLI") + parser.add_argument('--input', required=True, help='Pfad oder Datenquelle (JSON)') + parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei (CSV/JSON)') + args = parser.parse_args() + + input_path = _validate_file_path(args.input, 'r') + output_path = _validate_file_path(args.output, 'w') + + records = _load_sensor_data(input_path) + + for record in records: + try: + entry = LogEntry( + time=record.get('time', ''), + lux=float(record.get('lux', 0.0)), + dB=float(record.get('dB', 0.0)), + temperature=float(record.get('temperature', 0.0)), + inference_score=float(record.get('inference_score', 0.0)) + ) + except (TypeError, ValueError) as e: + print(f"Skipping invalid record: {record} – Error: {e}", file=sys.stderr) + continue + log_data(entry, output_path) + + print(f"Logging complete. Data written to: {output_path}") + + +if __name__ == '__main__': + main()