Add run_summary/tests/test_core.py

This commit is contained in:
Mika 2026-01-25 17:42:36 +00:00
parent 5305a248a0
commit b775cdfee6

View file

@ -0,0 +1,63 @@
import pytest
import json
from pathlib import Path
from run_summary import core
@pytest.fixture
def sample_raw_events():
# Synthetic RawEvent data
return [
{"corr_id": "a1", "metric_value": 10.0, "timestamp": "2024-06-01T10:00:00Z"},
{"corr_id": "a2", "metric_value": 15.0, "timestamp": "2024-06-01T10:01:00Z"},
{"corr_id": "a3", "metric_value": 20.0, "timestamp": "2024-06-01T10:02:00Z"},
{"corr_id": "a4", "metric_value": 12.0, "timestamp": "2024-06-01T10:03:00Z"},
{"corr_id": "a5", "metric_value": 18.0, "timestamp": "2024-06-01T10:04:00Z"}
]
def test_generate_summary_returns_expected_keys(sample_raw_events):
summary = core.generate_summary(sample_raw_events)
assert isinstance(summary, dict)
assert set(summary.keys()) == {"mischfenster_p95", "retry_free_in_window_rate"}
assert isinstance(summary["mischfenster_p95"], (float, int))
assert isinstance(summary["retry_free_in_window_rate"], (float, int))
def test_generate_summary_with_invalid_data():
invalid_data = [{"metric_value": "not_a_number", "corr_id": 123}]
with pytest.raises((ValueError, TypeError, AssertionError)):
core.generate_summary(invalid_data)
def test_make_gate_decision_typical_case(sample_raw_events):
summary = core.generate_summary(sample_raw_events)
decision = core.make_gate_decision(summary)
assert isinstance(decision, bool)
def test_make_gate_decision_edge_cases():
low_summary = {"mischfenster_p95": 5.0, "retry_free_in_window_rate": 1.0}
high_summary = {"mischfenster_p95": 200.0, "retry_free_in_window_rate": 0.0}
assert core.make_gate_decision(low_summary) in (True, False)
assert core.make_gate_decision(high_summary) in (True, False)
def test_export_debug_artifact_creates_file(tmp_path, sample_raw_events):
output_path = tmp_path / "debug.json"
result_path = core.export_debug_artifact(sample_raw_events, str(output_path))
assert Path(result_path).exists()
with open(result_path, "r", encoding="utf-8") as f:
data = json.load(f)
assert isinstance(data, list)
def test_export_debug_artifact_with_empty_input(tmp_path):
output_path = tmp_path / "debug_empty.json"
result_path = core.export_debug_artifact([], str(output_path))
assert Path(result_path).exists()
with open(result_path, "r", encoding="utf-8") as f:
data = json.load(f)
assert data == []