diff --git a/queue_separation/src/queue_separation/main.py b/queue_separation/src/queue_separation/main.py new file mode 100644 index 0000000..b2edb8c --- /dev/null +++ b/queue_separation/src/queue_separation/main.py @@ -0,0 +1,106 @@ +from __future__ import annotations +import json +import random +import statistics +import time +from dataclasses import dataclass, asdict +from typing import List, Dict, Any +import logging +import pandas as pd + + +# Configure logging for CI readiness +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s' +) +logger = logging.getLogger(__name__) + + +class SimulationError(Exception): + """Custom exception for simulation errors.""" + pass + + +@dataclass +class PerformanceMetrics: + retry_tail_p99: float + bandwidth: float + hotspot_percentage: float + + def to_dict(self) -> Dict[str, float]: + return asdict(self) + + +def _validate_jobs(jobs: List[Dict[str, Any]], label: str) -> None: + if not isinstance(jobs, list): + raise ValueError(f"{label} must be a list of dicts, got {type(jobs)}") + for job in jobs: + if not isinstance(job, dict): + raise ValueError(f"Each job in {label} must be a dict, got {type(job)}") + if 'runtime' not in job: + raise ValueError(f"Each job in {label} must contain a 'runtime' key") + if not isinstance(job['runtime'], (int, float)): + raise ValueError(f"Job runtime in {label} must be numeric, got {type(job['runtime'])}") + + +def simulate_queue_separation( + jobs: List[Dict[str, Any]], + hotspot_queue: List[Dict[str, Any]], + main_queue: List[Dict[str, Any]] +) -> Dict[str, float]: + """Führt eine Simulation der Queue-Entkopplung durch und liefert Performance-Metriken. + + Args: + jobs: Liste von Job-Objekten mit Metadaten zur Laufzeit und zum Queue-Typ. + hotspot_queue: Separate Queue für Hotspot-Jobs. + main_queue: Hauptqueue für reguläre Jobs. + + Returns: + dict: JSON-kompatible Struktur mit Messwerten wie retry_tail_p99, bandwidth, hotspot_percentage. + """ + start_ts = time.time() + + try: + _validate_jobs(jobs, 'jobs') + _validate_jobs(hotspot_queue, 'hotspot_queue') + _validate_jobs(main_queue, 'main_queue') + except ValueError as e: + logger.error(f"Validation failed: {e}") + raise SimulationError(f"Input validation error: {e}") from e + + if len(jobs) == 0: + raise SimulationError("No jobs provided for simulation.") + + # Simulate variability in job latencies and queue effects + all_runtimes = [j['runtime'] for j in jobs] + retries = [r * random.uniform(0.9, 1.2) for r in all_runtimes] + + retry_tail_p99 = statistics.quantiles(retries, n=100)[-1] + + # Compute total bandwidth as inverse relation to total processing time + total_runtime = sum(all_runtimes) + bandwidth = len(jobs) / (total_runtime + 1e-6) + + hotspot_percentage = (len(hotspot_queue) / len(jobs)) * 100.0 + + # Aggregate via pandas for future extensions + df = pd.DataFrame({ + 'runtime': all_runtimes, + 'retry_runtime': retries + }) + mean_runtime = df['runtime'].mean() + logger.info(f"Mean runtime: {mean_runtime:.3f}s | Tail P99: {retry_tail_p99:.3f}s | Bandwidth: {bandwidth:.3f} jobs/s") + + metrics = PerformanceMetrics( + retry_tail_p99=retry_tail_p99, + bandwidth=bandwidth, + hotspot_percentage=hotspot_percentage + ) + + duration = time.time() - start_ts + logger.debug(f"Simulation duration: {duration:.2f}s") + + assert all(v >= 0 for v in metrics.to_dict().values()), "Metrics must be non-negative" + + return metrics.to_dict()