Add decision_rule_generation/src/decision_rule_generation/core.py

This commit is contained in:
Mika 2026-02-17 16:16:30 +00:00
parent d810b00b2d
commit 21451204eb

View file

@ -0,0 +1,103 @@
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Dict, Union
import pandas as pd
logger = logging.getLogger(__name__)
if not logger.handlers:
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
class DecisionRuleGenerationError(Exception):
"""Custom exception for decision rule generation-related errors."""
def _validate_grid_results(df: pd.DataFrame) -> None:
"""Validates that required columns exist and have correct dtypes."""
required_columns = {
'phase': str,
'grace_minutes': (int, float),
'delay_seconds': (int, float),
'coverage_percent': (int, float),
'unknown_rate_percent': (int, float),
'worst_case_delay': (int, float),
}
for col, dtype in required_columns.items():
if col not in df.columns:
raise DecisionRuleGenerationError(f"Missing required column: {col}")
if not df[col].apply(lambda x: isinstance(x, dtype)).all():
raise DecisionRuleGenerationError(f"Column {col} contains invalid data types.")
def generate_decision_rules(grid_results: Union[pd.DataFrame, str]) -> Dict[str, Any]:
"""Liest grid_results.csv ein, analysiert kombinierte Metriken und gibt optimale Policy-Konfigurationen zurück."""
logger.info("Starting decision rule generation.")
if isinstance(grid_results, str):
csv_path = Path(grid_results)
if not csv_path.exists():
raise FileNotFoundError(f"Grid results file not found: {grid_results}")
df = pd.read_csv(csv_path)
elif isinstance(grid_results, pd.DataFrame):
df = grid_results.copy()
else:
raise TypeError("grid_results must be a pandas DataFrame or a path string.")
_validate_grid_results(df)
# Strategy: sort by highest coverage_percent, lowest worst_case_delay.
df_sorted = df.sort_values(by=['phase', 'coverage_percent', 'worst_case_delay'], ascending=[True, False, True])
best_rules: Dict[str, Dict[str, Union[int, float]]] = {
'policy_pinned': {},
'policy_unpinned': {}
}
for phase in ['pinned', 'unpinned']:
subset = df_sorted[df_sorted['phase'].str.lower() == phase]
if subset.empty:
logger.warning(f"No data found for phase '{phase}'. Skipping.")
continue
best_row = subset.iloc[0]
best_rules[f'policy_{phase}'] = {
'grace_minutes': int(best_row['grace_minutes']),
'delay_seconds': int(best_row['delay_seconds']),
'coverage_percent': float(best_row['coverage_percent']),
'worst_case_delay': float(best_row['worst_case_delay']),
}
logger.info(f"Best rule for {phase}: {best_rules[f'policy_{phase}']}.")
logger.info("Decision rule generation complete.")
assert 'policy_pinned' in best_rules and 'policy_unpinned' in best_rules, 'Decision rules must have both pinned and unpinned.'
return best_rules
def update_policy_constants(decision_rules: Dict[str, Any], policy_constants_file: str) -> None:
"""Aktualisiert policy_constants.json mit den generierten Entscheidungsregeln."""
logger.info(f"Updating policy constants file: {policy_constants_file}")
path = Path(policy_constants_file)
if not path.exists():
raise FileNotFoundError(f"Policy constants file not found: {policy_constants_file}")
with open(path, 'r', encoding='utf-8') as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError as e:
raise DecisionRuleGenerationError(f"Invalid JSON in policy_constants file: {e}")
if not isinstance(existing_data, dict):
raise DecisionRuleGenerationError("Invalid structure in policy_constants file: expected a JSON object.")
existing_data.update(decision_rules)
with open(path, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, indent=2, ensure_ascii=False)
logger.info("Policy constants file successfully updated.")
assert path.exists(), "Policy constants file must exist after update."