From 53afd383cc429b7b459f309e78ad642c797bf7f6 Mon Sep 17 00:00:00 2001 From: Mika Date: Mon, 16 Mar 2026 13:59:12 +0000 Subject: [PATCH] Add simulation_tool/src/simulation_tool/core.py --- simulation_tool/src/simulation_tool/core.py | 111 ++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 simulation_tool/src/simulation_tool/core.py diff --git a/simulation_tool/src/simulation_tool/core.py b/simulation_tool/src/simulation_tool/core.py new file mode 100644 index 0000000..f6a258d --- /dev/null +++ b/simulation_tool/src/simulation_tool/core.py @@ -0,0 +1,111 @@ +from __future__ import annotations +import logging +import random +import statistics +from typing import Dict, List +import numpy as np +import pandas as pd + +from simulation_tool.models import SimulationResults + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class SimulationError(Exception): + """Custom Exception for any simulation-level error.""" + pass + + +def _validate_schedule_params(schedule_params: Dict) -> None: + """Validate schedule parameters strictly to ensure correct simulation configuration.""" + if not isinstance(schedule_params, dict): + raise SimulationError("schedule_params must be a dictionary.") + required_keys = {"threads", "duration", "mechanism"} + missing = required_keys - schedule_params.keys() + if missing: + raise SimulationError(f"Missing required schedule parameters: {missing}") + + threads = schedule_params.get("threads") + if not isinstance(threads, int) or threads <= 0: + raise SimulationError("'threads' must be a positive integer.") + + duration = schedule_params.get("duration") + if not isinstance(duration, (int, float)) or duration <= 0: + raise SimulationError("'duration' must be a positive number.") + + mechanism = schedule_params.get("mechanism") + if not isinstance(mechanism, str): + raise SimulationError("'mechanism' must be a string.") + + +def _simulate_time_distributions(threads: int, duration: float, mechanism: str) -> List[float]: + """Simulate time distributions based on scheduling parameters.""" + random.seed(42) + np.random.seed(42) + + base_count = max(100, threads * 50) + + if mechanism.lower() == "round_robin": + latencies = np.random.normal(loc=duration / 2, scale=duration / 10, size=base_count) + elif mechanism.lower() == "priority_queue": + latencies = np.random.lognormal(mean=np.log(duration / 3), sigma=0.5, size=base_count) + elif mechanism.lower() == "dynamic": + latencies = np.random.gamma(shape=2.0, scale=duration / 5, size=base_count) + else: + latencies = np.random.uniform(low=duration / 4, high=duration, size=base_count) + + latencies = [abs(float(x)) for x in latencies] + return latencies + + +def _calculate_outliers(values: List[float]) -> int: + """Calculate number of outliers using IQR method.""" + if not values: + return 0 + + q1 = np.percentile(values, 25) + q3 = np.percentile(values, 75) + iqr = q3 - q1 + lower_bound = q1 - 1.5 * iqr + upper_bound = q3 + 1.5 * iqr + + outliers = [v for v in values if v < lower_bound or v > upper_bound] + return len(outliers) + + +def simulate_scheduling(schedule_params: Dict) -> SimulationResults: + """Führt eine Simulation verschiedener Scheduling-Parameter durch und liefert Metriken. + + Args: + schedule_params (Dict): Parameter zur Definition des Scheduling-Verhaltens. + + Returns: + SimulationResults: Simulationsergebnisse inklusive Zeitverteilungen und Outlier-Statistik. + """ + logger.info("Starting scheduling simulation with parameters: %s", schedule_params) + + _validate_schedule_params(schedule_params) + + threads = schedule_params.get("threads") + duration = schedule_params.get("duration") + mechanism = schedule_params.get("mechanism") + + try: + latencies = _simulate_time_distributions(threads, duration, mechanism) + outliers = _calculate_outliers(latencies) + + if pd.isna(outliers): + outliers = 0 + + except Exception as e: + logger.exception("Simulation failed due to internal error.") + raise SimulationError(f"Simulation failed: {e}") from e + + assert isinstance(latencies, list), "latencies must be a list of floats" + assert all(isinstance(x, float) for x in latencies), "All latencies must be floats" + + results = SimulationResults(time_distributions=latencies, outlier_occurrences=outliers) + logger.info("Simulation completed successfully with %d outliers.", outliers) + return results