Add laser_echo_analysis/src/laser_echo_analysis/cli.py

This commit is contained in:
Mika 2026-05-17 02:07:33 +00:00
parent 41c9a3c623
commit 512b9c5517

View file

@ -0,0 +1,70 @@
import argparse
import json
import os
from pathlib import Path
from typing import Any
import pandas as pd
from laser_echo_analysis.core import analyze_data
from laser_echo_analysis.visualization import visualize_results
def _read_csv(file_path: Path) -> list[dict[str, Any]]:
"""Read laser measurement CSV into list of dicts with validated fields."""
if not file_path.exists() or not file_path.is_file():
raise FileNotFoundError(f"CSV input file not found: {file_path}")
try:
df = pd.read_csv(file_path)
except Exception as e:
raise ValueError(f"Failed to read CSV file: {e}") from e
required_columns = {"timestamp", "pixel_value", "delta_t"}
if not required_columns.issubset(df.columns):
missing = required_columns - set(df.columns)
raise ValueError(f"CSV file missing required columns: {', '.join(missing)}")
data_records = df.to_dict(orient="records")
return data_records
def _write_json(output_path: Path, data: dict[str, Any]) -> None:
"""Write analysis results to JSON file."""
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
raise IOError(f"Failed to write JSON output: {e}") from e
def main() -> None:
"""CLI entry point for laser echo analysis."""
parser = argparse.ArgumentParser(description="Satellite Laser Echo Data Analysis")
parser.add_argument(
"--input",
required=True,
help="Pfad zur CSV-Eingabedatei mit Messdaten.",
)
parser.add_argument(
"--output",
required=False,
help="Zielpfad für die Analyseergebnisse im JSON-Format.",
)
args = parser.parse_args()
input_path = Path(args.input).resolve()
output_path = Path(args.output).resolve() if args.output else None
data = _read_csv(input_path)
analysis_results = analyze_data(data)
if output_path:
_write_json(output_path, analysis_results)
else:
print(json.dumps(analysis_results, indent=2, ensure_ascii=False))
visualize_results(analysis_results)
if __name__ == "__main__":
main()