diff --git a/ebpf_trace_instrumentation/tests/test_core.py b/ebpf_trace_instrumentation/tests/test_core.py new file mode 100644 index 0000000..b3c7bf1 --- /dev/null +++ b/ebpf_trace_instrumentation/tests/test_core.py @@ -0,0 +1,94 @@ +import pytest +import json +from pathlib import Path +from src.ebpf_trace_instrumentation import core + +@pytest.fixture +def sample_event_data(): + return { + "ktime_ns": 1234567890, + "CPU": 2, + "field_tag": "clocksource_id", + "new_value_hash": 998877, + "corr_id": "corr-001" + } + +@pytest.fixture +def invalid_event_data(): + # Missing required fields intentionally + return {"ktime_ns": "invalid", "CPU": "NaN"} + + +def test_publish_event_init_valid(sample_event_data): + ev = core.PublishEvent(**sample_event_data) + assert ev.ktime_ns == 1234567890 + assert ev.CPU == 2 + assert ev.field_tag == "clocksource_id" + assert ev.new_value_hash == 998877 + assert ev.corr_id == "corr-001" + + +def test_publish_event_init_type_validation(sample_event_data): + # Validate all types match expected + ev = core.PublishEvent(**sample_event_data) + assert isinstance(ev.ktime_ns, int) + assert isinstance(ev.CPU, int) + assert isinstance(ev.field_tag, str) + assert isinstance(ev.new_value_hash, int) + assert isinstance(ev.corr_id, str) + + +def test_add_event_appends(monkeypatch, sample_event_data): + # Mock internal collection + collected = [] + + def mock_append(e): + collected.append(e) + + monkeypatch.setattr(core, "_event_log", collected) + + core.add_event("clocksource_id", sample_event_data) + assert len(collected) == 1 + added_event = collected[0] + assert isinstance(added_event, core.PublishEvent) + assert added_event.field_tag == "clocksource_id" + + +def test_add_event_invalid_input(monkeypatch, invalid_event_data): + collected = [] + + def mock_append(e): + collected.append(e) + + monkeypatch.setattr(core, "_event_log", collected) + with pytest.raises((TypeError, ValueError)): + core.add_event("invalid", invalid_event_data) + + +def test_add_event_multiple_calls(monkeypatch, sample_event_data): + collected = [] + + def mock_append(e): + collected.append(e) + + monkeypatch.setattr(core, "_event_log", collected) + + for i in range(5): + core.add_event(f"field_{i}", sample_event_data) + + assert len(collected) == 5 + assert all(isinstance(e, core.PublishEvent) for e in collected) + + tags = [e.field_tag for e in collected] + assert tags == [f"field_{i}" for i in range(5)] + + +def test_event_json_serialization(tmp_path, sample_event_data): + ev = core.PublishEvent(**sample_event_data) + file_path = tmp_path / "event.json" + with open(file_path, "w", encoding="utf-8") as f: + json.dump(ev.__dict__, f) + + with open(file_path, encoding="utf-8") as f: + loaded = json.load(f) + assert loaded == sample_event_data \ No newline at end of file