Add log_analysis/tests/test_core.py
This commit is contained in:
parent
2d2163d869
commit
0f01ad2e10
1 changed files with 70 additions and 0 deletions
70
log_analysis/tests/test_core.py
Normal file
70
log_analysis/tests/test_core.py
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
import pytest
|
||||
import pandas as pd
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from pathlib import Path
|
||||
|
||||
import src.log_analysis.core as core
|
||||
|
||||
|
||||
def make_sample_csv(tmp_path: Path, name: str, warn_rate: float, unknown_rate: float, policy_hash: str) -> str:
|
||||
data = [
|
||||
{
|
||||
'timestamp': '2024-04-01T12:00:00',
|
||||
'run_id': 'runA',
|
||||
'policy_hash': policy_hash,
|
||||
'warn_rate': warn_rate,
|
||||
'unknown_rate': unknown_rate,
|
||||
't_index_visible': 1.0,
|
||||
't_gate_read': 2.0,
|
||||
'pinned_status': 'unpinned'
|
||||
}
|
||||
]
|
||||
file_path = tmp_path / f"{name}.csv"
|
||||
pd.DataFrame(data).to_csv(file_path, index=False)
|
||||
return str(file_path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_logs(tmp_path):
|
||||
run_new = make_sample_csv(tmp_path, 'runA', 0.12, 0.02, 'abc123')
|
||||
run_base = make_sample_csv(tmp_path, 'runB', 0.10, 0.01, 'abc123')
|
||||
return SimpleNamespace(new=run_new, base=run_base)
|
||||
|
||||
|
||||
def test_analyze_logs_basic(sample_logs):
|
||||
result = core.analyze_logs(sample_logs.new, sample_logs.base)
|
||||
assert isinstance(result, dict)
|
||||
assert set(result.keys()) == {'policy_hash', 'warn_rate', 'unknown_rate', 'timing_analysis'}
|
||||
assert result['policy_hash'] == 'MATCH: abc123'
|
||||
assert abs(result['warn_rate'] - 0.02) < 1e-6
|
||||
assert abs(result['unknown_rate'] - 0.01) < 1e-6
|
||||
assert isinstance(result['timing_analysis'], str)
|
||||
|
||||
|
||||
def test_analyze_logs_different_policy(sample_logs, tmp_path):
|
||||
run_other = make_sample_csv(tmp_path, 'runC', 0.12, 0.02, 'def999')
|
||||
result = core.analyze_logs(run_other, sample_logs.base)
|
||||
assert 'MISMATCH' in result['policy_hash']
|
||||
|
||||
|
||||
def test_analyze_logs_missing_file():
|
||||
with pytest.raises(FileNotFoundError):
|
||||
core.analyze_logs('no_such_file.csv', 'no_such_base.csv')
|
||||
|
||||
|
||||
def test_analyze_logs_edge_cases(tmp_path):
|
||||
f1 = make_sample_csv(tmp_path, 'f1', 0.0, 0.0, 'x1')
|
||||
f2 = make_sample_csv(tmp_path, 'f2', 0.0, 0.0, 'x1')
|
||||
result = core.analyze_logs(f1, f2)
|
||||
assert result['warn_rate'] == 0.0
|
||||
assert result['unknown_rate'] == 0.0
|
||||
assert isinstance(result['timing_analysis'], str)
|
||||
|
||||
|
||||
def test_result_serializable(sample_logs):
|
||||
result = core.analyze_logs(sample_logs.new, sample_logs.base)
|
||||
dumped = json.dumps(result)
|
||||
reloaded = json.loads(dumped)
|
||||
assert isinstance(reloaded, dict)
|
||||
assert 'policy_hash' in reloaded
|
||||
Loading…
Reference in a new issue