diff --git a/trace_analysis/tests/test_core.py b/trace_analysis/tests/test_core.py new file mode 100644 index 0000000..6d2c011 --- /dev/null +++ b/trace_analysis/tests/test_core.py @@ -0,0 +1,57 @@ +import json +import os +import tempfile +import pytest +from pathlib import Path + +from trace_analysis import core + +def create_test_event_file(tmp_path): + events = [ + {"timestamp": "2024-07-01T12:00:00", "migration_type": "unpinned", "latency": 12}, + {"timestamp": "2024-07-01T12:00:01", "migration_type": "unpinned", "latency": 35}, + {"timestamp": "2024-07-01T12:00:02", "migration_type": "half_pinned", "latency": 15}, + {"timestamp": "2024-07-01T12:00:03", "migration_type": "half_pinned", "latency": 40}, + {"timestamp": "2024-07-01T12:00:04", "migration_type": "fully_pinned", "latency": 50} + ] + file_path = tmp_path / "test_events.json" + with open(file_path, "w", encoding="utf-8") as f: + json.dump(events, f) + return file_path + + +def test_analyze_spikes_output_structure(tmp_path): + test_file = create_test_event_file(tmp_path) + report = core.analyze_spikes(str(test_file)) + + # basic type checks + assert isinstance(report, dict) + assert set(report.keys()) == {"migration_type", "timestamp", "spike_count"} + + # field type checks + assert isinstance(report["migration_type"], str) + assert isinstance(report["timestamp"], str) + assert isinstance(report["spike_count"], int) + + +def test_analyze_spikes_with_invalid_file(tmp_path): + bad_file = tmp_path / "missing.json" + with pytest.raises((FileNotFoundError, ValueError)): + core.analyze_spikes(str(bad_file)) + + +def test_analyze_spikes_counts_spikes_correctly(tmp_path): + events = [ + {"timestamp": "2024-07-01T00:00:00", "migration_type": "unpinned", "latency": 10}, + {"timestamp": "2024-07-01T00:00:01", "migration_type": "unpinned", "latency": 45}, + {"timestamp": "2024-07-01T00:00:02", "migration_type": "unpinned", "latency": 60} + ] + fpath = tmp_path / "events.json" + with open(fpath, "w", encoding="utf-8") as fh: + json.dump(events, fh) + + report = core.analyze_spikes(str(fpath)) + # Spike detection likely based on latency > threshold, expect count >= 1 + assert report["spike_count"] >= 1 + assert report["migration_type"] == "unpinned" + assert "T" in report["timestamp"] \ No newline at end of file