Add trace_analysis/tests/test_core.py
This commit is contained in:
parent
2b530d98d8
commit
366b66fce7
1 changed files with 57 additions and 0 deletions
57
trace_analysis/tests/test_core.py
Normal file
57
trace_analysis/tests/test_core.py
Normal file
|
|
@ -0,0 +1,57 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from trace_analysis import core
|
||||||
|
|
||||||
|
def create_test_event_file(tmp_path):
|
||||||
|
events = [
|
||||||
|
{"timestamp": "2024-07-01T12:00:00", "migration_type": "unpinned", "latency": 12},
|
||||||
|
{"timestamp": "2024-07-01T12:00:01", "migration_type": "unpinned", "latency": 35},
|
||||||
|
{"timestamp": "2024-07-01T12:00:02", "migration_type": "half_pinned", "latency": 15},
|
||||||
|
{"timestamp": "2024-07-01T12:00:03", "migration_type": "half_pinned", "latency": 40},
|
||||||
|
{"timestamp": "2024-07-01T12:00:04", "migration_type": "fully_pinned", "latency": 50}
|
||||||
|
]
|
||||||
|
file_path = tmp_path / "test_events.json"
|
||||||
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(events, f)
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
|
||||||
|
def test_analyze_spikes_output_structure(tmp_path):
|
||||||
|
test_file = create_test_event_file(tmp_path)
|
||||||
|
report = core.analyze_spikes(str(test_file))
|
||||||
|
|
||||||
|
# basic type checks
|
||||||
|
assert isinstance(report, dict)
|
||||||
|
assert set(report.keys()) == {"migration_type", "timestamp", "spike_count"}
|
||||||
|
|
||||||
|
# field type checks
|
||||||
|
assert isinstance(report["migration_type"], str)
|
||||||
|
assert isinstance(report["timestamp"], str)
|
||||||
|
assert isinstance(report["spike_count"], int)
|
||||||
|
|
||||||
|
|
||||||
|
def test_analyze_spikes_with_invalid_file(tmp_path):
|
||||||
|
bad_file = tmp_path / "missing.json"
|
||||||
|
with pytest.raises((FileNotFoundError, ValueError)):
|
||||||
|
core.analyze_spikes(str(bad_file))
|
||||||
|
|
||||||
|
|
||||||
|
def test_analyze_spikes_counts_spikes_correctly(tmp_path):
|
||||||
|
events = [
|
||||||
|
{"timestamp": "2024-07-01T00:00:00", "migration_type": "unpinned", "latency": 10},
|
||||||
|
{"timestamp": "2024-07-01T00:00:01", "migration_type": "unpinned", "latency": 45},
|
||||||
|
{"timestamp": "2024-07-01T00:00:02", "migration_type": "unpinned", "latency": 60}
|
||||||
|
]
|
||||||
|
fpath = tmp_path / "events.json"
|
||||||
|
with open(fpath, "w", encoding="utf-8") as fh:
|
||||||
|
json.dump(events, fh)
|
||||||
|
|
||||||
|
report = core.analyze_spikes(str(fpath))
|
||||||
|
# Spike detection likely based on latency > threshold, expect count >= 1
|
||||||
|
assert report["spike_count"] >= 1
|
||||||
|
assert report["migration_type"] == "unpinned"
|
||||||
|
assert "T" in report["timestamp"]
|
||||||
Loading…
Reference in a new issue