Add laser_echo_analysis/src/laser_echo_analysis/core.py

This commit is contained in:
Mika 2026-05-17 02:07:32 +00:00
parent 081b87b8d9
commit afe54c3d96

View file

@ -0,0 +1,103 @@
from __future__ import annotations
import logging
import math
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataValidationError(Exception):
"""Custom exception for data validation errors."""
pass
@dataclass
class LaserMeasurement:
timestamp: pd.Timestamp
pixel_value: int
delta_t: float
@dataclass
class AnalysisResult:
peak: int
average_noise: float
signal_to_noise_ratio: float
def _validate_input_data(data: List[Dict[str, Any]]) -> pd.DataFrame:
"""Validate and convert input data into pandas DataFrame."""
if not isinstance(data, list):
raise DataValidationError("Input data must be a list of dictionaries.")
if not data:
raise DataValidationError("Input data list is empty.")
df = pd.DataFrame(data)
required_columns = {"timestamp", "pixel_value", "delta_t"}
missing = required_columns - set(df.columns)
if missing:
raise DataValidationError(f"Missing required fields: {', '.join(missing)}")
try:
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="raise")
df["pixel_value"] = df["pixel_value"].astype(int)
df["delta_t"] = df["delta_t"].astype(float)
except Exception as e:
raise DataValidationError(f"Data type conversion failed: {e}")
return df
def analyze_data(data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Analysiert Rohmessdaten aus einer CSV-Datei und berechnet Kennzahlen wie Spitzenwert,
durchschnittliches Rauschen und Signal-zu-Rausch-Verhältnis.
Args:
data (list[dict]): Liste der Messdatensätze, eingelesen aus der CSV-Datei.
Returns:
dict: Analyseergebnisse inklusive peak, average_noise und signal_to_noise_ratio.
"""
logger.info("Starting laser data analysis on %d records", len(data))
df = _validate_input_data(data)
if df.empty:
raise DataValidationError("DataFrame is empty after validation.")
peak = int(df["pixel_value"].max())
noise_values = df.loc[
(df["pixel_value"] < peak * 0.1) & (df["pixel_value"] > 0), "pixel_value"
]
average_noise = float(noise_values.mean()) if not noise_values.empty else 0.0
if average_noise <= 0:
signal_to_noise_ratio = math.inf
else:
signal_to_noise_ratio = float(peak / average_noise)
result = AnalysisResult(
peak=peak,
average_noise=average_noise,
signal_to_noise_ratio=signal_to_noise_ratio,
)
result_dict = {
"peak": result.peak,
"average_noise": result.average_noise,
"signal_to_noise_ratio": result.signal_to_noise_ratio,
}
logger.info("Analysis completed: %s", result_dict)
assert all(k in result_dict for k in ("peak", "average_noise", "signal_to_noise_ratio"))
return result_dict