Add queue_separation/src/queue_separation/main.py
This commit is contained in:
parent
fccb35f861
commit
f1ecf3d5eb
1 changed files with 106 additions and 0 deletions
106
queue_separation/src/queue_separation/main.py
Normal file
106
queue_separation/src/queue_separation/main.py
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
from __future__ import annotations
|
||||
import json
|
||||
import random
|
||||
import statistics
|
||||
import time
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import List, Dict, Any
|
||||
import logging
|
||||
import pandas as pd
|
||||
|
||||
|
||||
# Configure logging for CI readiness
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s [%(levelname)s] %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SimulationError(Exception):
|
||||
"""Custom exception for simulation errors."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PerformanceMetrics:
|
||||
retry_tail_p99: float
|
||||
bandwidth: float
|
||||
hotspot_percentage: float
|
||||
|
||||
def to_dict(self) -> Dict[str, float]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
def _validate_jobs(jobs: List[Dict[str, Any]], label: str) -> None:
|
||||
if not isinstance(jobs, list):
|
||||
raise ValueError(f"{label} must be a list of dicts, got {type(jobs)}")
|
||||
for job in jobs:
|
||||
if not isinstance(job, dict):
|
||||
raise ValueError(f"Each job in {label} must be a dict, got {type(job)}")
|
||||
if 'runtime' not in job:
|
||||
raise ValueError(f"Each job in {label} must contain a 'runtime' key")
|
||||
if not isinstance(job['runtime'], (int, float)):
|
||||
raise ValueError(f"Job runtime in {label} must be numeric, got {type(job['runtime'])}")
|
||||
|
||||
|
||||
def simulate_queue_separation(
|
||||
jobs: List[Dict[str, Any]],
|
||||
hotspot_queue: List[Dict[str, Any]],
|
||||
main_queue: List[Dict[str, Any]]
|
||||
) -> Dict[str, float]:
|
||||
"""Führt eine Simulation der Queue-Entkopplung durch und liefert Performance-Metriken.
|
||||
|
||||
Args:
|
||||
jobs: Liste von Job-Objekten mit Metadaten zur Laufzeit und zum Queue-Typ.
|
||||
hotspot_queue: Separate Queue für Hotspot-Jobs.
|
||||
main_queue: Hauptqueue für reguläre Jobs.
|
||||
|
||||
Returns:
|
||||
dict: JSON-kompatible Struktur mit Messwerten wie retry_tail_p99, bandwidth, hotspot_percentage.
|
||||
"""
|
||||
start_ts = time.time()
|
||||
|
||||
try:
|
||||
_validate_jobs(jobs, 'jobs')
|
||||
_validate_jobs(hotspot_queue, 'hotspot_queue')
|
||||
_validate_jobs(main_queue, 'main_queue')
|
||||
except ValueError as e:
|
||||
logger.error(f"Validation failed: {e}")
|
||||
raise SimulationError(f"Input validation error: {e}") from e
|
||||
|
||||
if len(jobs) == 0:
|
||||
raise SimulationError("No jobs provided for simulation.")
|
||||
|
||||
# Simulate variability in job latencies and queue effects
|
||||
all_runtimes = [j['runtime'] for j in jobs]
|
||||
retries = [r * random.uniform(0.9, 1.2) for r in all_runtimes]
|
||||
|
||||
retry_tail_p99 = statistics.quantiles(retries, n=100)[-1]
|
||||
|
||||
# Compute total bandwidth as inverse relation to total processing time
|
||||
total_runtime = sum(all_runtimes)
|
||||
bandwidth = len(jobs) / (total_runtime + 1e-6)
|
||||
|
||||
hotspot_percentage = (len(hotspot_queue) / len(jobs)) * 100.0
|
||||
|
||||
# Aggregate via pandas for future extensions
|
||||
df = pd.DataFrame({
|
||||
'runtime': all_runtimes,
|
||||
'retry_runtime': retries
|
||||
})
|
||||
mean_runtime = df['runtime'].mean()
|
||||
logger.info(f"Mean runtime: {mean_runtime:.3f}s | Tail P99: {retry_tail_p99:.3f}s | Bandwidth: {bandwidth:.3f} jobs/s")
|
||||
|
||||
metrics = PerformanceMetrics(
|
||||
retry_tail_p99=retry_tail_p99,
|
||||
bandwidth=bandwidth,
|
||||
hotspot_percentage=hotspot_percentage
|
||||
)
|
||||
|
||||
duration = time.time() - start_ts
|
||||
logger.debug(f"Simulation duration: {duration:.2f}s")
|
||||
|
||||
assert all(v >= 0 for v in metrics.to_dict().values()), "Metrics must be non-negative"
|
||||
|
||||
return metrics.to_dict()
|
||||
Loading…
Reference in a new issue