From 21451204eb02c8d21116753458e18d6d7d6c9c93 Mon Sep 17 00:00:00 2001 From: Mika Date: Tue, 17 Feb 2026 16:16:30 +0000 Subject: [PATCH] Add decision_rule_generation/src/decision_rule_generation/core.py --- .../src/decision_rule_generation/core.py | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 decision_rule_generation/src/decision_rule_generation/core.py diff --git a/decision_rule_generation/src/decision_rule_generation/core.py b/decision_rule_generation/src/decision_rule_generation/core.py new file mode 100644 index 0000000..df556df --- /dev/null +++ b/decision_rule_generation/src/decision_rule_generation/core.py @@ -0,0 +1,103 @@ +from __future__ import annotations +import json +import logging +from pathlib import Path +from typing import Any, Dict, Union +import pandas as pd + + +logger = logging.getLogger(__name__) +if not logger.handlers: + logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s') + + +class DecisionRuleGenerationError(Exception): + """Custom exception for decision rule generation-related errors.""" + + + +def _validate_grid_results(df: pd.DataFrame) -> None: + """Validates that required columns exist and have correct dtypes.""" + required_columns = { + 'phase': str, + 'grace_minutes': (int, float), + 'delay_seconds': (int, float), + 'coverage_percent': (int, float), + 'unknown_rate_percent': (int, float), + 'worst_case_delay': (int, float), + } + for col, dtype in required_columns.items(): + if col not in df.columns: + raise DecisionRuleGenerationError(f"Missing required column: {col}") + if not df[col].apply(lambda x: isinstance(x, dtype)).all(): + raise DecisionRuleGenerationError(f"Column {col} contains invalid data types.") + + + +def generate_decision_rules(grid_results: Union[pd.DataFrame, str]) -> Dict[str, Any]: + """Liest grid_results.csv ein, analysiert kombinierte Metriken und gibt optimale Policy-Konfigurationen zurück.""" + logger.info("Starting decision rule generation.") + + if isinstance(grid_results, str): + csv_path = Path(grid_results) + if not csv_path.exists(): + raise FileNotFoundError(f"Grid results file not found: {grid_results}") + df = pd.read_csv(csv_path) + elif isinstance(grid_results, pd.DataFrame): + df = grid_results.copy() + else: + raise TypeError("grid_results must be a pandas DataFrame or a path string.") + + _validate_grid_results(df) + + # Strategy: sort by highest coverage_percent, lowest worst_case_delay. + df_sorted = df.sort_values(by=['phase', 'coverage_percent', 'worst_case_delay'], ascending=[True, False, True]) + + best_rules: Dict[str, Dict[str, Union[int, float]]] = { + 'policy_pinned': {}, + 'policy_unpinned': {} + } + + for phase in ['pinned', 'unpinned']: + subset = df_sorted[df_sorted['phase'].str.lower() == phase] + if subset.empty: + logger.warning(f"No data found for phase '{phase}'. Skipping.") + continue + best_row = subset.iloc[0] + best_rules[f'policy_{phase}'] = { + 'grace_minutes': int(best_row['grace_minutes']), + 'delay_seconds': int(best_row['delay_seconds']), + 'coverage_percent': float(best_row['coverage_percent']), + 'worst_case_delay': float(best_row['worst_case_delay']), + } + logger.info(f"Best rule for {phase}: {best_rules[f'policy_{phase}']}.") + + logger.info("Decision rule generation complete.") + assert 'policy_pinned' in best_rules and 'policy_unpinned' in best_rules, 'Decision rules must have both pinned and unpinned.' + return best_rules + + +def update_policy_constants(decision_rules: Dict[str, Any], policy_constants_file: str) -> None: + """Aktualisiert policy_constants.json mit den generierten Entscheidungsregeln.""" + logger.info(f"Updating policy constants file: {policy_constants_file}") + + path = Path(policy_constants_file) + if not path.exists(): + raise FileNotFoundError(f"Policy constants file not found: {policy_constants_file}") + + with open(path, 'r', encoding='utf-8') as f: + try: + existing_data = json.load(f) + except json.JSONDecodeError as e: + raise DecisionRuleGenerationError(f"Invalid JSON in policy_constants file: {e}") + + if not isinstance(existing_data, dict): + raise DecisionRuleGenerationError("Invalid structure in policy_constants file: expected a JSON object.") + + existing_data.update(decision_rules) + + with open(path, 'w', encoding='utf-8') as f: + json.dump(existing_data, f, indent=2, ensure_ascii=False) + + logger.info("Policy constants file successfully updated.") + assert path.exists(), "Policy constants file must exist after update." \ No newline at end of file