From 934712d4d8a1944cd947810eefd939970455784b Mon Sep 17 00:00:00 2001 From: Mika Date: Sun, 22 Feb 2026 12:32:36 +0000 Subject: [PATCH] Add artifact.timing_analysis/src/artifact_timing_analysis/core.py --- .../src/artifact_timing_analysis/core.py | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 artifact.timing_analysis/src/artifact_timing_analysis/core.py diff --git a/artifact.timing_analysis/src/artifact_timing_analysis/core.py b/artifact.timing_analysis/src/artifact_timing_analysis/core.py new file mode 100644 index 0000000..1dbd85d --- /dev/null +++ b/artifact.timing_analysis/src/artifact_timing_analysis/core.py @@ -0,0 +1,104 @@ +from __future__ import annotations +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import List, Dict, Any +import pandas as pd +import statistics + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +@dataclass +class TimingData: + timestamp: datetime + t_gate_read: float + t_index_visible: float + offset: float + + @staticmethod + def from_dict(d: Dict[str, Any]) -> 'TimingData': + try: + ts = d.get('timestamp') + if isinstance(ts, str): + ts = datetime.fromisoformat(ts) + elif not isinstance(ts, datetime): + raise ValueError('Invalid timestamp format') + + return TimingData( + timestamp=ts, + t_gate_read=float(d.get('t_gate_read', 0.0)), + t_index_visible=float(d.get('t_index_visible', 0.0)), + offset=float(d.get('offset', d.get('t_index_visible', 0.0) - d.get('t_gate_read', 0.0))) + ) + except Exception as e: + logger.error(f"Invalid TimingData input: {e}") + raise + + +_anomaly_cache: List[Dict[str, Any]] = [] + +def analyze_timing_offsets(timing_data: List[TimingData]) -> Dict[str, Any]: + if not timing_data: + raise ValueError("timing_data list is empty") + + df = pd.DataFrame([vars(td) for td in timing_data]) + if 'offset' not in df.columns: + raise ValueError('Missing offset field in timing data') + + valid_offsets = df['offset'].dropna() + if len(valid_offsets) == 0: + raise ValueError('No valid offset values found') + + mean = statistics.fmean(valid_offsets) + p95 = valid_offsets.quantile(0.95) + stddev = statistics.pstdev(valid_offsets) + + anomaly_threshold = mean + 2 * stddev + anomalies = df[df['offset'] > anomaly_threshold] + + global _anomaly_cache + _anomaly_cache = anomalies.to_dict(orient='records') + + result = { + 'count': len(df), + 'mean_offset': mean, + 'p95_offset': float(p95), + 'stddev_offset': stddev, + 'anomaly_threshold': anomaly_threshold, + 'anomaly_count': len(anomalies) + } + + # Assertions for CI readiness + assert 'mean_offset' in result + assert isinstance(result['count'], int) + + logger.info(f"Analyzed {len(df)} records, found {len(anomalies)} anomalies") + return result + + +def report_timing_anomalies() -> str: + global _anomaly_cache + if not _anomaly_cache: + return 'No anomalies detected.' + + lines = [ + 'Timing Anomalies Report', + '=======================', + f'Total anomalies: {len(_anomaly_cache)}', + '' + ] + + for i, anom in enumerate(_anomaly_cache, 1): + ts = anom.get('timestamp') + if isinstance(ts, datetime): + ts_str = ts.isoformat() + else: + ts_str = str(ts) + lines.append(f"{i}. Timestamp: {ts_str}, Offset: {anom.get('offset'):.4f}") + + report = '\n'.join(lines) + logger.info("Generated anomaly report") + return report