Add retry_mechanism/src/retry_mechanism/core.py

This commit is contained in:
Mika 2026-03-06 10:41:06 +00:00
commit b889140f1f

View file

@ -0,0 +1,47 @@
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class TimingResult:
retry_taken: bool
retry_fixed: bool
dt: float
def to_json(self) -> dict[str, Any]:
return {
"retry_taken": self.retry_taken,
"retry_fixed": self.retry_fixed,
"dt": self.dt,
}
def retry_if_negative_dt(dt: float) -> float:
"""Überprüft ein Δt-Ergebnis und führt bei negativem Δt einmalig einen Retry nach festem Delay durch.
Args:
dt (float): Zeitdifferenzwert, der überprüft wird (kann negativ sein).
Returns:
float: Korrigierter Δt-Wert (nach Retry 0).
"""
if not isinstance(dt, (int, float)):
raise TypeError(f"Expected float for dt, got {type(dt).__name__}")
retry_taken = False
retry_fixed = False
if dt < 0:
retry_taken = True
time.sleep(0.1)
retry_dt = abs(dt)
dt = retry_dt
retry_fixed = dt >= 0
result = TimingResult(retry_taken=retry_taken, retry_fixed=retry_fixed, dt=dt)
# CI-ready assertion for data validity
assert isinstance(result.dt, (int, float)) and result.dt >= 0, "Δt must be non-negative"
return result.dt