Add ir_gain_test/src/ir_gain_test/core.py

This commit is contained in:
Mika 2026-05-10 02:07:42 +00:00
parent 2529afd9fc
commit e34523230b

View file

@ -0,0 +1,105 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
import statistics
import pandas as pd
# Configure basic logging for reproducibility
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class IntensityDataEntry:
timestamp: datetime
gain_value: float
intensity: float
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'IntensityDataEntry':
try:
ts = data.get('timestamp')
if isinstance(ts, str):
try:
timestamp = datetime.fromisoformat(ts)
except ValueError:
raise ValueError(f"Invalid timestamp format: {ts}")
elif isinstance(ts, datetime):
timestamp = ts
else:
raise ValueError("Missing or invalid timestamp field.")
gain_value = float(data['gain_value'])
intensity = float(data['intensity'])
except (KeyError, TypeError, ValueError) as e:
raise ValueError(f"Invalid intensity data entry: {e}") from e
return IntensityDataEntry(timestamp, gain_value, intensity)
@dataclass
class AnalysisReport:
mean_intensity: float
signal_to_noise_ratio: float
optimal_gain: float
def to_dict(self) -> Dict[str, Any]:
return {
"mean_intensity": self.mean_intensity,
"signal_to_noise_ratio": self.signal_to_noise_ratio,
"optimal_gain": self.optimal_gain,
}
def analyze_ir_gain(gain_values: List[float], intensity_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze intensity data across gain settings and produce performance metrics.
Args:
gain_values: List of gain settings tested.
intensity_data: List of dictionaries following IntensityDataEntry structure.
Returns:
A dictionary representing the AnalysisReport.
"""
if not gain_values or not intensity_data:
raise ValueError("gain_values and intensity_data must not be empty.")
logger.info("Starting IR gain analysis with %d gain values and %d data points.", len(gain_values), len(intensity_data))
entries: List[IntensityDataEntry] = [IntensityDataEntry.from_dict(d) for d in intensity_data]
# Build DataFrame for flexible analysis
df = pd.DataFrame([{
'timestamp': e.timestamp,
'gain_value': e.gain_value,
'intensity': e.intensity
} for e in entries])
# Compute global statistics
mean_intensity = df['intensity'].mean()
std_intensity = df['intensity'].std(ddof=1) if len(df) > 1 else 0.0
signal_to_noise_ratio = (mean_intensity / std_intensity) if std_intensity > 0 else float('inf')
logger.info("Mean intensity: %.4f, Std deviation: %.4f, SNR: %.4f", mean_intensity, std_intensity, signal_to_noise_ratio)
# Find optimal gain based on stable and strong signal region
grouped = df.groupby('gain_value')['intensity'].agg(['mean', 'std']).reset_index()
grouped['snr'] = grouped['mean'] / grouped['std'].replace(0.0, float('inf'))
# Define heuristic for optimal gain: max SNR but avoid unstable extremes
optimal_idx = grouped['snr'].idxmax()
optimal_gain = grouped.loc[optimal_idx, 'gain_value']
logger.info("Computed optimal gain value: %.4f", optimal_gain)
report = AnalysisReport(
mean_intensity=float(round(mean_intensity, 4)),
signal_to_noise_ratio=float(round(signal_to_noise_ratio, 4)),
optimal_gain=float(round(optimal_gain, 4)),
)
# Validation
assert report.mean_intensity >= 0.0, "Mean intensity should be non-negative"
return report.to_dict()