Add artifact.timing_analysis/src/artifact_timing_analysis/core.py

This commit is contained in:
Mika 2026-02-22 12:32:36 +00:00
parent cac7e4d126
commit 934712d4d8

View file

@ -0,0 +1,104 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any
import pandas as pd
import statistics
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@dataclass
class TimingData:
timestamp: datetime
t_gate_read: float
t_index_visible: float
offset: float
@staticmethod
def from_dict(d: Dict[str, Any]) -> 'TimingData':
try:
ts = d.get('timestamp')
if isinstance(ts, str):
ts = datetime.fromisoformat(ts)
elif not isinstance(ts, datetime):
raise ValueError('Invalid timestamp format')
return TimingData(
timestamp=ts,
t_gate_read=float(d.get('t_gate_read', 0.0)),
t_index_visible=float(d.get('t_index_visible', 0.0)),
offset=float(d.get('offset', d.get('t_index_visible', 0.0) - d.get('t_gate_read', 0.0)))
)
except Exception as e:
logger.error(f"Invalid TimingData input: {e}")
raise
_anomaly_cache: List[Dict[str, Any]] = []
def analyze_timing_offsets(timing_data: List[TimingData]) -> Dict[str, Any]:
if not timing_data:
raise ValueError("timing_data list is empty")
df = pd.DataFrame([vars(td) for td in timing_data])
if 'offset' not in df.columns:
raise ValueError('Missing offset field in timing data')
valid_offsets = df['offset'].dropna()
if len(valid_offsets) == 0:
raise ValueError('No valid offset values found')
mean = statistics.fmean(valid_offsets)
p95 = valid_offsets.quantile(0.95)
stddev = statistics.pstdev(valid_offsets)
anomaly_threshold = mean + 2 * stddev
anomalies = df[df['offset'] > anomaly_threshold]
global _anomaly_cache
_anomaly_cache = anomalies.to_dict(orient='records')
result = {
'count': len(df),
'mean_offset': mean,
'p95_offset': float(p95),
'stddev_offset': stddev,
'anomaly_threshold': anomaly_threshold,
'anomaly_count': len(anomalies)
}
# Assertions for CI readiness
assert 'mean_offset' in result
assert isinstance(result['count'], int)
logger.info(f"Analyzed {len(df)} records, found {len(anomalies)} anomalies")
return result
def report_timing_anomalies() -> str:
global _anomaly_cache
if not _anomaly_cache:
return 'No anomalies detected.'
lines = [
'Timing Anomalies Report',
'=======================',
f'Total anomalies: {len(_anomaly_cache)}',
''
]
for i, anom in enumerate(_anomaly_cache, 1):
ts = anom.get('timestamp')
if isinstance(ts, datetime):
ts_str = ts.isoformat()
else:
ts_str = str(ts)
lines.append(f"{i}. Timestamp: {ts_str}, Offset: {anom.get('offset'):.4f}")
report = '\n'.join(lines)
logger.info("Generated anomaly report")
return report