Add ebpf_trace_instrumentation/tests/test_core.py
This commit is contained in:
parent
68ec49022e
commit
1bdbb56a8b
1 changed files with 94 additions and 0 deletions
94
ebpf_trace_instrumentation/tests/test_core.py
Normal file
94
ebpf_trace_instrumentation/tests/test_core.py
Normal file
|
|
@ -0,0 +1,94 @@
|
||||||
|
import pytest
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from src.ebpf_trace_instrumentation import core
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_event_data():
|
||||||
|
return {
|
||||||
|
"ktime_ns": 1234567890,
|
||||||
|
"CPU": 2,
|
||||||
|
"field_tag": "clocksource_id",
|
||||||
|
"new_value_hash": 998877,
|
||||||
|
"corr_id": "corr-001"
|
||||||
|
}
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def invalid_event_data():
|
||||||
|
# Missing required fields intentionally
|
||||||
|
return {"ktime_ns": "invalid", "CPU": "NaN"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_publish_event_init_valid(sample_event_data):
|
||||||
|
ev = core.PublishEvent(**sample_event_data)
|
||||||
|
assert ev.ktime_ns == 1234567890
|
||||||
|
assert ev.CPU == 2
|
||||||
|
assert ev.field_tag == "clocksource_id"
|
||||||
|
assert ev.new_value_hash == 998877
|
||||||
|
assert ev.corr_id == "corr-001"
|
||||||
|
|
||||||
|
|
||||||
|
def test_publish_event_init_type_validation(sample_event_data):
|
||||||
|
# Validate all types match expected
|
||||||
|
ev = core.PublishEvent(**sample_event_data)
|
||||||
|
assert isinstance(ev.ktime_ns, int)
|
||||||
|
assert isinstance(ev.CPU, int)
|
||||||
|
assert isinstance(ev.field_tag, str)
|
||||||
|
assert isinstance(ev.new_value_hash, int)
|
||||||
|
assert isinstance(ev.corr_id, str)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_event_appends(monkeypatch, sample_event_data):
|
||||||
|
# Mock internal collection
|
||||||
|
collected = []
|
||||||
|
|
||||||
|
def mock_append(e):
|
||||||
|
collected.append(e)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_event_log", collected)
|
||||||
|
|
||||||
|
core.add_event("clocksource_id", sample_event_data)
|
||||||
|
assert len(collected) == 1
|
||||||
|
added_event = collected[0]
|
||||||
|
assert isinstance(added_event, core.PublishEvent)
|
||||||
|
assert added_event.field_tag == "clocksource_id"
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_event_invalid_input(monkeypatch, invalid_event_data):
|
||||||
|
collected = []
|
||||||
|
|
||||||
|
def mock_append(e):
|
||||||
|
collected.append(e)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_event_log", collected)
|
||||||
|
with pytest.raises((TypeError, ValueError)):
|
||||||
|
core.add_event("invalid", invalid_event_data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_event_multiple_calls(monkeypatch, sample_event_data):
|
||||||
|
collected = []
|
||||||
|
|
||||||
|
def mock_append(e):
|
||||||
|
collected.append(e)
|
||||||
|
|
||||||
|
monkeypatch.setattr(core, "_event_log", collected)
|
||||||
|
|
||||||
|
for i in range(5):
|
||||||
|
core.add_event(f"field_{i}", sample_event_data)
|
||||||
|
|
||||||
|
assert len(collected) == 5
|
||||||
|
assert all(isinstance(e, core.PublishEvent) for e in collected)
|
||||||
|
|
||||||
|
tags = [e.field_tag for e in collected]
|
||||||
|
assert tags == [f"field_{i}" for i in range(5)]
|
||||||
|
|
||||||
|
|
||||||
|
def test_event_json_serialization(tmp_path, sample_event_data):
|
||||||
|
ev = core.PublishEvent(**sample_event_data)
|
||||||
|
file_path = tmp_path / "event.json"
|
||||||
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(ev.__dict__, f)
|
||||||
|
|
||||||
|
with open(file_path, encoding="utf-8") as f:
|
||||||
|
loaded = json.load(f)
|
||||||
|
assert loaded == sample_event_data
|
||||||
Loading…
Reference in a new issue