Add simulation_tool/src/simulation_tool/core.py

This commit is contained in:
Mika 2026-03-16 13:59:12 +00:00
parent f32269e351
commit 53afd383cc

View file

@ -0,0 +1,111 @@
from __future__ import annotations
import logging
import random
import statistics
from typing import Dict, List
import numpy as np
import pandas as pd
from simulation_tool.models import SimulationResults
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class SimulationError(Exception):
"""Custom Exception for any simulation-level error."""
pass
def _validate_schedule_params(schedule_params: Dict) -> None:
"""Validate schedule parameters strictly to ensure correct simulation configuration."""
if not isinstance(schedule_params, dict):
raise SimulationError("schedule_params must be a dictionary.")
required_keys = {"threads", "duration", "mechanism"}
missing = required_keys - schedule_params.keys()
if missing:
raise SimulationError(f"Missing required schedule parameters: {missing}")
threads = schedule_params.get("threads")
if not isinstance(threads, int) or threads <= 0:
raise SimulationError("'threads' must be a positive integer.")
duration = schedule_params.get("duration")
if not isinstance(duration, (int, float)) or duration <= 0:
raise SimulationError("'duration' must be a positive number.")
mechanism = schedule_params.get("mechanism")
if not isinstance(mechanism, str):
raise SimulationError("'mechanism' must be a string.")
def _simulate_time_distributions(threads: int, duration: float, mechanism: str) -> List[float]:
"""Simulate time distributions based on scheduling parameters."""
random.seed(42)
np.random.seed(42)
base_count = max(100, threads * 50)
if mechanism.lower() == "round_robin":
latencies = np.random.normal(loc=duration / 2, scale=duration / 10, size=base_count)
elif mechanism.lower() == "priority_queue":
latencies = np.random.lognormal(mean=np.log(duration / 3), sigma=0.5, size=base_count)
elif mechanism.lower() == "dynamic":
latencies = np.random.gamma(shape=2.0, scale=duration / 5, size=base_count)
else:
latencies = np.random.uniform(low=duration / 4, high=duration, size=base_count)
latencies = [abs(float(x)) for x in latencies]
return latencies
def _calculate_outliers(values: List[float]) -> int:
"""Calculate number of outliers using IQR method."""
if not values:
return 0
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = [v for v in values if v < lower_bound or v > upper_bound]
return len(outliers)
def simulate_scheduling(schedule_params: Dict) -> SimulationResults:
"""Führt eine Simulation verschiedener Scheduling-Parameter durch und liefert Metriken.
Args:
schedule_params (Dict): Parameter zur Definition des Scheduling-Verhaltens.
Returns:
SimulationResults: Simulationsergebnisse inklusive Zeitverteilungen und Outlier-Statistik.
"""
logger.info("Starting scheduling simulation with parameters: %s", schedule_params)
_validate_schedule_params(schedule_params)
threads = schedule_params.get("threads")
duration = schedule_params.get("duration")
mechanism = schedule_params.get("mechanism")
try:
latencies = _simulate_time_distributions(threads, duration, mechanism)
outliers = _calculate_outliers(latencies)
if pd.isna(outliers):
outliers = 0
except Exception as e:
logger.exception("Simulation failed due to internal error.")
raise SimulationError(f"Simulation failed: {e}") from e
assert isinstance(latencies, list), "latencies must be a list of floats"
assert all(isinstance(x, float) for x in latencies), "All latencies must be floats"
results = SimulationResults(time_distributions=latencies, outlier_occurrences=outliers)
logger.info("Simulation completed successfully with %d outliers.", outliers)
return results