Add data_analysis/src/data_analysis/core.py

This commit is contained in:
Mika 2026-02-24 13:33:05 +00:00
parent 6be3c5b35c
commit 5cd3549a76

View file

@ -0,0 +1,73 @@
from __future__ import annotations
import pandas as pd
from typing import List, Dict
class DataValidationError(ValueError):
"""Raised when provided run data is invalid or incomplete."""
pass
def _validate_run_data(data: List[dict]) -> None:
required_fields = {"policy_hash", "warn_rate", "unknown_rate", "delta_time"}
if not isinstance(data, list):
raise DataValidationError("Input data must be a list of dictionaries.")
for entry in data:
if not isinstance(entry, dict):
raise DataValidationError("Each run entry must be a dictionary.")
missing = required_fields - set(entry.keys())
if missing:
raise DataValidationError(f"Missing required fields: {missing}")
for field in required_fields:
if field in ("warn_rate", "unknown_rate", "delta_time"):
try:
float(entry[field])
except (TypeError, ValueError):
raise DataValidationError(
f"Field '{field}' must be convertible to float."
)
def calculate_warn_rate(data: List[dict]) -> float:
"""Computes the average warning rate across all runs.
Args:
data: List of run data records each with a 'warn_rate' field.
Returns:
Average warning rate as a float. If data is empty, returns 0.0.
"""
_validate_run_data(data)
if not data:
return 0.0
df = pd.DataFrame(data)
mean_warn_rate = df["warn_rate"].astype(float).mean()
assert 0.0 <= mean_warn_rate <= 1.0 or mean_warn_rate > 1.0, "Mean warning rate should be non-negative."
return float(mean_warn_rate)
def delta_time_distribution(data: List[dict]) -> Dict[str, float]:
"""Calculates distribution statistics (mean, variance, min, max) for delta_time.
Args:
data: List of run data records with 'delta_time' field.
Returns:
Dictionary containing mean, variance, min, max metrics.
If data is empty, returns metrics set to 0.0.
"""
_validate_run_data(data)
if not data:
return {"mean": 0.0, "variance": 0.0, "min": 0.0, "max": 0.0}
df = pd.DataFrame(data)
delta_col = df["delta_time"].astype(float)
stats = {
"mean": float(delta_col.mean()),
"variance": float(delta_col.var(ddof=0)),
"min": float(delta_col.min()),
"max": float(delta_col.max()),
}
for key, val in stats.items():
assert isinstance(val, float), f"Statistic '{key}' must be float."
return stats