diff --git a/laser_echo_analysis/src/laser_echo_analysis/core.py b/laser_echo_analysis/src/laser_echo_analysis/core.py new file mode 100644 index 0000000..5537ef3 --- /dev/null +++ b/laser_echo_analysis/src/laser_echo_analysis/core.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import logging +import math +from dataclasses import dataclass +from typing import List, Dict, Any + +import pandas as pd + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class DataValidationError(Exception): + """Custom exception for data validation errors.""" + pass + + +@dataclass +class LaserMeasurement: + timestamp: pd.Timestamp + pixel_value: int + delta_t: float + + +@dataclass +class AnalysisResult: + peak: int + average_noise: float + signal_to_noise_ratio: float + + + +def _validate_input_data(data: List[Dict[str, Any]]) -> pd.DataFrame: + """Validate and convert input data into pandas DataFrame.""" + if not isinstance(data, list): + raise DataValidationError("Input data must be a list of dictionaries.") + if not data: + raise DataValidationError("Input data list is empty.") + + df = pd.DataFrame(data) + required_columns = {"timestamp", "pixel_value", "delta_t"} + missing = required_columns - set(df.columns) + if missing: + raise DataValidationError(f"Missing required fields: {', '.join(missing)}") + + try: + df["timestamp"] = pd.to_datetime(df["timestamp"], errors="raise") + df["pixel_value"] = df["pixel_value"].astype(int) + df["delta_t"] = df["delta_t"].astype(float) + except Exception as e: + raise DataValidationError(f"Data type conversion failed: {e}") + + return df + + +def analyze_data(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analysiert Rohmessdaten aus einer CSV-Datei und berechnet Kennzahlen wie Spitzenwert, + durchschnittliches Rauschen und Signal-zu-Rausch-Verhältnis. + + Args: + data (list[dict]): Liste der Messdatensätze, eingelesen aus der CSV-Datei. + + Returns: + dict: Analyseergebnisse inklusive peak, average_noise und signal_to_noise_ratio. + """ + logger.info("Starting laser data analysis on %d records", len(data)) + + df = _validate_input_data(data) + + if df.empty: + raise DataValidationError("DataFrame is empty after validation.") + + peak = int(df["pixel_value"].max()) + + noise_values = df.loc[ + (df["pixel_value"] < peak * 0.1) & (df["pixel_value"] > 0), "pixel_value" + ] + average_noise = float(noise_values.mean()) if not noise_values.empty else 0.0 + + if average_noise <= 0: + signal_to_noise_ratio = math.inf + else: + signal_to_noise_ratio = float(peak / average_noise) + + result = AnalysisResult( + peak=peak, + average_noise=average_noise, + signal_to_noise_ratio=signal_to_noise_ratio, + ) + + result_dict = { + "peak": result.peak, + "average_noise": result.average_noise, + "signal_to_noise_ratio": result.signal_to_noise_ratio, + } + + logger.info("Analysis completed: %s", result_dict) + assert all(k in result_dict for k in ("peak", "average_noise", "signal_to_noise_ratio")) + + return result_dict