Add scheduling_analysis/src/scheduling_analysis/core.py

This commit is contained in:
Mika 2026-03-16 13:59:10 +00:00
parent 77475cd6bd
commit 160a765744

View file

@ -0,0 +1,113 @@
from __future__ import annotations
import logging
from typing import Any, Dict, List
import pandas as pd
from dataclasses import dataclass, asdict
# Configure logging for CI readiness
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
class AnalysisError(Exception):
"""Custom exception for scheduling analysis errors."""
@dataclass
class AnalysisResult:
"""Data model for analysis results."""
max_outlier_effect: float
resonance_band_shift: float
metrics: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def _validate_input_data(data: List[Dict[str, Any]]) -> pd.DataFrame:
if not isinstance(data, list):
raise AnalysisError("Input 'data' must be a list of dictionaries.")
if not data:
raise AnalysisError("Input data list is empty.")
df = pd.DataFrame(data)
required_cols = {"run_id", "metric_name", "metric_value", "mechanism"}
missing = required_cols - set(df.columns)
if missing:
raise AnalysisError(f"Missing required fields: {', '.join(missing)}")
# Validate metric_value types
if not pd.api.types.is_numeric_dtype(df["metric_value"]):
raise AnalysisError("Field 'metric_value' must be numeric.")
logger.debug("Input data validation passed with %d rows.", len(df))
return df
def analyze_scheduling_effects(data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analysiert die Auswirkungen von Scheduling-Mechanismen auf Resonanzband und Max-Outlier."""
df = _validate_input_data(data)
# Group by mechanism and metric_name for aggregation
try:
pivot = (
df.pivot_table(
index="mechanism",
columns="metric_name",
values="metric_value",
aggfunc="mean",
)
.reset_index()
.fillna(0.0)
)
except Exception as exc:
raise AnalysisError(f"Failed to pivot data for analysis: {exc}") from exc
if pivot.empty:
raise AnalysisError("Pivot table generated no data.")
logger.debug("Pivot table created with shape: %s", pivot.shape)
# Compute max_outlier_effect — difference between max and min of max_outlier_ms
if "max_outlier_ms" in pivot.columns:
max_outlier_effect = float(pivot["max_outlier_ms"].max() - pivot["max_outlier_ms"].min())
else:
max_outlier_effect = 0.0
logger.debug("Calculated max_outlier_effect: %.4f", max_outlier_effect)
# Compute resonance_band_shift — difference between mean band_width_h across mechanisms
if "band_width_h" in pivot.columns:
resonance_band_shift = float(pivot["band_width_h"].max() - pivot["band_width_h"].min())
else:
resonance_band_shift = 0.0
logger.debug("Calculated resonance_band_shift: %.4f", resonance_band_shift)
# Collect additional metrics summary
metrics_summary = {}
for metric in pivot.columns:
if metric == "mechanism":
continue
metrics_summary[metric] = {
"mean": float(pivot[metric].mean()),
"std": float(pivot[metric].std(ddof=0)),
}
logger.debug("Metrics summary computed for %d metrics.", len(metrics_summary))
result = AnalysisResult(
max_outlier_effect=max_outlier_effect,
resonance_band_shift=resonance_band_shift,
metrics=metrics_summary,
)
# Simple CI consistency check
assert all(key in result.to_dict() for key in ["max_outlier_effect", "resonance_band_shift", "metrics"]), (
"Result missing expected keys."
)
return result.to_dict()