Add integer_buckets_tests/src/integer_buckets_tests/main.py

This commit is contained in:
Mika 2025-12-12 15:42:23 +00:00
commit c1bb5f8688

View file

@ -0,0 +1,44 @@
import json
import os
import pathlib
import pytest
import integer_buckets
def test_integer_buckets(data):
"""Testet die Korrektheit und Deterministik der Integer-Buckets-Aggregation."""
# Eingabedaten validieren
assert isinstance(data, list), "data muss eine Liste von Dictionaries sein"
for rec in data:
assert isinstance(rec, dict), "Jedes Element muss ein Dictionary sein"
assert {'bucket_id', 'value', 'timestamp'} <= rec.keys(), "Felder fehlen im Datensatz"
# Führe Aggregation durch
try:
result1 = integer_buckets.aggregate(data)
result2 = integer_buckets.aggregate(data)
except Exception as e:
pytest.fail(f"Fehler beim Aggregieren der Daten: {e}")
# Ergebnis prüfen
assert isinstance(result1, dict), "Aggregationsergebnis sollte ein Dictionary sein"
assert result1 == result2, "Aggregation muss deterministisch sein"
input_sum = sum(rec['value'] for rec in data)
result_sum = sum(result1.values()) if isinstance(result1, dict) else 0
assert input_sum == result_sum, "Summen müssen übereinstimmen"
return True
def _load_synthetic_data():
data_path = pathlib.Path(__file__).parent.parent / 'tests' / 'data' / 'synthetic_trace.json'
if not data_path.exists():
pytest.skip(f"Testdaten nicht gefunden: {data_path}")
with open(data_path, 'r', encoding='utf-8') as f:
return json.load(f)
if __name__ == '__main__':
data = _load_synthetic_data()
success = test_integer_buckets(data)
print("Testergebnis:", success)