Add artifact_1/tests/test_core.py
This commit is contained in:
parent
7e48815ed5
commit
c480c59a01
1 changed files with 104 additions and 0 deletions
104
artifact_1/tests/test_core.py
Normal file
104
artifact_1/tests/test_core.py
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Dynamically add src directory to path for imports
|
||||||
|
ROOT_DIR = Path(__file__).resolve().parents[1]
|
||||||
|
SRC_DIR = ROOT_DIR / 'src'
|
||||||
|
if SRC_DIR.exists():
|
||||||
|
sys.path.insert(0, str(SRC_DIR))
|
||||||
|
|
||||||
|
from artifact_1 import core # type: ignore
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def log_file(tmp_path):
|
||||||
|
# Patch the log path used by log_timestamps inside core if present
|
||||||
|
log_path = tmp_path / 'timestamps.json'
|
||||||
|
yield log_path
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_entry():
|
||||||
|
return {
|
||||||
|
'epoch_ms': int(time.time() * 1000),
|
||||||
|
'monotonic_ns': time.monotonic_ns(),
|
||||||
|
'tz_offset_minutes': 60,
|
||||||
|
'run_id': 'test_run',
|
||||||
|
'step_id': 'step_A'
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_logentry_datamodel_integrity(sample_entry):
|
||||||
|
entry = core.LogEntry(**sample_entry)
|
||||||
|
assert entry.epoch_ms == sample_entry['epoch_ms']
|
||||||
|
assert entry.monotonic_ns == sample_entry['monotonic_ns']
|
||||||
|
assert entry.tz_offset_minutes == 60
|
||||||
|
assert entry.run_id == 'test_run'
|
||||||
|
assert entry.step_id == 'step_A'
|
||||||
|
# Validate that it can be converted to dict correctly
|
||||||
|
data = entry.__dict__
|
||||||
|
assert set(data.keys()) == {'epoch_ms', 'monotonic_ns', 'tz_offset_minutes', 'run_id', 'step_id'}
|
||||||
|
|
||||||
|
def test_log_timestamps_creates_valid_json(tmp_path, sample_entry, monkeypatch):
|
||||||
|
log_path = tmp_path / 'timestamps.json'
|
||||||
|
|
||||||
|
# Patch path inside core if required
|
||||||
|
monkeypatch.setattr(core, 'LOG_PATH', log_path)
|
||||||
|
|
||||||
|
core.log_timestamps(**sample_entry)
|
||||||
|
|
||||||
|
assert log_path.exists(), 'Log file must be created.'
|
||||||
|
|
||||||
|
content = log_path.read_text().strip().splitlines()
|
||||||
|
assert content, 'Log file should contain entries.'
|
||||||
|
|
||||||
|
# Validate JSON structure
|
||||||
|
for line in content:
|
||||||
|
obj = json.loads(line)
|
||||||
|
for field in ['epoch_ms', 'monotonic_ns', 'tz_offset_minutes', 'run_id', 'step_id']:
|
||||||
|
assert field in obj, f'Missing field {field}'
|
||||||
|
assert isinstance(obj[field], (int, str)), f'Field {field} type invalid'
|
||||||
|
|
||||||
|
# Check monotonic increase simulated with successive writes
|
||||||
|
core.log_timestamps(**{
|
||||||
|
'epoch_ms': sample_entry['epoch_ms'] + 10,
|
||||||
|
'monotonic_ns': sample_entry['monotonic_ns'] + 1_000_000,
|
||||||
|
'tz_offset_minutes': sample_entry['tz_offset_minutes'],
|
||||||
|
'run_id': 'test_run',
|
||||||
|
'step_id': 'step_B'
|
||||||
|
})
|
||||||
|
new_content = log_path.read_text().strip().splitlines()
|
||||||
|
assert len(new_content) == len(content) + 1, 'New entry should be appended.'
|
||||||
|
|
||||||
|
def test_negative_or_invalid_inputs(tmp_path):
|
||||||
|
log_path = tmp_path / 'timestamps.json'
|
||||||
|
setattr(core, 'LOG_PATH', log_path)
|
||||||
|
|
||||||
|
# negative timestamp should raise AssertionError or ValueError depending on implementation
|
||||||
|
with pytest.raises((AssertionError, ValueError, TypeError)):
|
||||||
|
core.log_timestamps(-1000, time.monotonic_ns(), 0, 'id', 'step')
|
||||||
|
|
||||||
|
with pytest.raises((AssertionError, ValueError, TypeError)):
|
||||||
|
core.log_timestamps(int(time.time()*1000), -1, 0, 'id', 'step')
|
||||||
|
|
||||||
|
with pytest.raises((AssertionError, ValueError, TypeError)):
|
||||||
|
core.log_timestamps('not_int', 'not_int', 0, 'id', 'step')
|
||||||
|
|
||||||
|
def test_multiple_runs_dont_overlap(tmp_path):
|
||||||
|
log_path = tmp_path / 'timestamps.json'
|
||||||
|
setattr(core, 'LOG_PATH', log_path)
|
||||||
|
|
||||||
|
runs = [('r1', 'A'), ('r2', 'B')]
|
||||||
|
for run_id, step_id in runs:
|
||||||
|
core.log_timestamps(
|
||||||
|
int(time.time()*1000),
|
||||||
|
time.monotonic_ns(),
|
||||||
|
0,
|
||||||
|
run_id,
|
||||||
|
step_id
|
||||||
|
)
|
||||||
|
content = log_path.read_text().splitlines()
|
||||||
|
data = [json.loads(c) for c in content]
|
||||||
|
run_ids = {d['run_id'] for d in data}
|
||||||
|
assert run_ids == {'r1', 'r2'}, 'Each run_id entry must exist once.'
|
||||||
Loading…
Reference in a new issue