Add outlier_analysis/tests/test_core.py
This commit is contained in:
parent
8da7d82862
commit
e6a47c9e7c
1 changed files with 70 additions and 0 deletions
70
outlier_analysis/tests/test_core.py
Normal file
70
outlier_analysis/tests/test_core.py
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
import json
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from outlier_analysis.core import analyze_outliers
|
||||
|
||||
|
||||
def load_sample_data():
|
||||
sample_path = Path(__file__).parent / 'data' / 'outlier_sample.json'
|
||||
if not sample_path.exists():
|
||||
# Fallback synthetic data
|
||||
return [
|
||||
{
|
||||
'corr_id': f'c{i}',
|
||||
'stratum': 'A' if i < 5 else 'B',
|
||||
'job_parallelism': 2,
|
||||
'expires_at_dist_hours': 10.0 + i,
|
||||
't_gate_read': 1.0,
|
||||
't_index_visible': 2.0,
|
||||
'retry_total_overhead_ms': float(i * 10),
|
||||
'policy_hash': 'abc',
|
||||
'setup_fingerprint': 'fp1'
|
||||
}
|
||||
for i in range(10)
|
||||
]
|
||||
with open(sample_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def log_entries():
|
||||
data = load_sample_data()
|
||||
assert isinstance(data, list)
|
||||
assert all(isinstance(x, dict) for x in data)
|
||||
return data
|
||||
|
||||
|
||||
def test_analyze_outliers_basic_stats(log_entries):
|
||||
result = analyze_outliers(log_entries)
|
||||
assert isinstance(result, dict)
|
||||
# Validate presence of expected keys
|
||||
for key in ('mean', 'median', 'p90', 'p95', 'p99', 'max', 'clusters'):
|
||||
assert key in result, f"Missing key: {key}"
|
||||
assert isinstance(result['clusters'], list)
|
||||
|
||||
|
||||
def test_mean_and_p95_values(log_entries):
|
||||
result = analyze_outliers(log_entries)
|
||||
mean_value = result['mean']
|
||||
p95_value = result['p95']
|
||||
assert mean_value >= 0.0
|
||||
assert p95_value >= mean_value, "p95 should not be smaller than mean"
|
||||
|
||||
|
||||
def test_cluster_detection_consistency(log_entries):
|
||||
# Duplicate certain patterns to enforce a detectable cluster
|
||||
repeated = log_entries + [dict(log_entries[0]), dict(log_entries[1])]
|
||||
result = analyze_outliers(repeated)
|
||||
clusters = result['clusters']
|
||||
assert isinstance(clusters, list)
|
||||
# Some clustering should be reported when duplicates exist
|
||||
assert len(clusters) >= 1
|
||||
# Each cluster entry should have consistent keys
|
||||
for c in clusters:
|
||||
assert isinstance(c, dict)
|
||||
assert 'pattern' in c or 'members' in c
|
||||
|
||||
|
||||
def test_invalid_input_raises():
|
||||
with pytest.raises((TypeError, KeyError, ValueError)):
|
||||
analyze_outliers('not_a_list')
|
||||
Loading…
Reference in a new issue