diff --git a/ir_gain_test/src/ir_gain_test/core.py b/ir_gain_test/src/ir_gain_test/core.py new file mode 100644 index 0000000..5dd65b5 --- /dev/null +++ b/ir_gain_test/src/ir_gain_test/core.py @@ -0,0 +1,105 @@ +from __future__ import annotations +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import List, Dict, Any +import statistics +import pandas as pd + + +# Configure basic logging for reproducibility +logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s') +logger = logging.getLogger(__name__) + + +@dataclass +class IntensityDataEntry: + timestamp: datetime + gain_value: float + intensity: float + + @staticmethod + def from_dict(data: Dict[str, Any]) -> 'IntensityDataEntry': + try: + ts = data.get('timestamp') + if isinstance(ts, str): + try: + timestamp = datetime.fromisoformat(ts) + except ValueError: + raise ValueError(f"Invalid timestamp format: {ts}") + elif isinstance(ts, datetime): + timestamp = ts + else: + raise ValueError("Missing or invalid timestamp field.") + + gain_value = float(data['gain_value']) + intensity = float(data['intensity']) + except (KeyError, TypeError, ValueError) as e: + raise ValueError(f"Invalid intensity data entry: {e}") from e + return IntensityDataEntry(timestamp, gain_value, intensity) + + +@dataclass +class AnalysisReport: + mean_intensity: float + signal_to_noise_ratio: float + optimal_gain: float + + def to_dict(self) -> Dict[str, Any]: + return { + "mean_intensity": self.mean_intensity, + "signal_to_noise_ratio": self.signal_to_noise_ratio, + "optimal_gain": self.optimal_gain, + } + + +def analyze_ir_gain(gain_values: List[float], intensity_data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze intensity data across gain settings and produce performance metrics. + + Args: + gain_values: List of gain settings tested. + intensity_data: List of dictionaries following IntensityDataEntry structure. + + Returns: + A dictionary representing the AnalysisReport. + """ + + if not gain_values or not intensity_data: + raise ValueError("gain_values and intensity_data must not be empty.") + + logger.info("Starting IR gain analysis with %d gain values and %d data points.", len(gain_values), len(intensity_data)) + + entries: List[IntensityDataEntry] = [IntensityDataEntry.from_dict(d) for d in intensity_data] + + # Build DataFrame for flexible analysis + df = pd.DataFrame([{ + 'timestamp': e.timestamp, + 'gain_value': e.gain_value, + 'intensity': e.intensity + } for e in entries]) + + # Compute global statistics + mean_intensity = df['intensity'].mean() + std_intensity = df['intensity'].std(ddof=1) if len(df) > 1 else 0.0 + signal_to_noise_ratio = (mean_intensity / std_intensity) if std_intensity > 0 else float('inf') + logger.info("Mean intensity: %.4f, Std deviation: %.4f, SNR: %.4f", mean_intensity, std_intensity, signal_to_noise_ratio) + + # Find optimal gain based on stable and strong signal region + grouped = df.groupby('gain_value')['intensity'].agg(['mean', 'std']).reset_index() + grouped['snr'] = grouped['mean'] / grouped['std'].replace(0.0, float('inf')) + + # Define heuristic for optimal gain: max SNR but avoid unstable extremes + optimal_idx = grouped['snr'].idxmax() + optimal_gain = grouped.loc[optimal_idx, 'gain_value'] + logger.info("Computed optimal gain value: %.4f", optimal_gain) + + report = AnalysisReport( + mean_intensity=float(round(mean_intensity, 4)), + signal_to_noise_ratio=float(round(signal_to_noise_ratio, 4)), + optimal_gain=float(round(optimal_gain, 4)), + ) + + # Validation + assert report.mean_intensity >= 0.0, "Mean intensity should be non-negative" + + return report.to_dict()