Add artifact_unit_tests/tests/test_trace_agg.py
This commit is contained in:
commit
b69af806c0
1 changed files with 61 additions and 0 deletions
61
artifact_unit_tests/tests/test_trace_agg.py
Normal file
61
artifact_unit_tests/tests/test_trace_agg.py
Normal file
|
|
@ -0,0 +1,61 @@
|
||||||
|
import os
|
||||||
|
import csv
|
||||||
|
import unittest
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import trace_agg
|
||||||
|
|
||||||
|
|
||||||
|
def load_sample_traces(directory_path):
|
||||||
|
"""Liest CSV-Dateien aus dem angegebenen Verzeichnis und gibt eine Liste von TraceEvent-Dictionaries zurück."""
|
||||||
|
traces = []
|
||||||
|
if not os.path.isdir(directory_path):
|
||||||
|
raise FileNotFoundError(f"Testdatendirectory nicht gefunden: {directory_path}")
|
||||||
|
for fname in os.listdir(directory_path):
|
||||||
|
if fname.endswith('.csv'):
|
||||||
|
fpath = os.path.join(directory_path, fname)
|
||||||
|
with open(fpath, newline='') as csvfile:
|
||||||
|
reader = csv.DictReader(csvfile)
|
||||||
|
for row in reader:
|
||||||
|
try:
|
||||||
|
traces.append({
|
||||||
|
'timestamp': float(row['timestamp']),
|
||||||
|
'event_type': row['event_type'],
|
||||||
|
'value': int(row['value'])
|
||||||
|
})
|
||||||
|
except (KeyError, ValueError) as e:
|
||||||
|
raise ValueError(f'Ungueltige Zeile in {fname}: {row}') from e
|
||||||
|
return traces
|
||||||
|
|
||||||
|
|
||||||
|
class TestTraceAggregation(unittest.TestCase):
|
||||||
|
"""Führt Testfälle zur Überprüfung der Aggregationslogik in trace_agg aus."""
|
||||||
|
|
||||||
|
def test_aggregation(self):
|
||||||
|
"""Validiert, dass die Aggregationsfunktion korrekte Summen liefert."""
|
||||||
|
# Ermittele Verzeichnis für Beispieldaten
|
||||||
|
data_dir = os.environ.get('TEST_DATA_DIR', os.path.join(os.path.dirname(__file__), 'data'))
|
||||||
|
if not os.path.exists(data_dir):
|
||||||
|
self.skipTest(f"Kein Testdatenverzeichnis vorhanden: {data_dir}")
|
||||||
|
|
||||||
|
# Lade Beispieldaten
|
||||||
|
traces = load_sample_traces(data_dir)
|
||||||
|
|
||||||
|
# Aggregation durchführen
|
||||||
|
results = trace_agg.aggregate_clocksource_events(traces)
|
||||||
|
|
||||||
|
# Sicherstellen, dass Filterung nur clocksource-Events enthält
|
||||||
|
for event in traces:
|
||||||
|
if event['event_type'] != 'clocksource':
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Basislogik validieren – bekannte Gesamtzahl laut Beschreibung: 496
|
||||||
|
total_value = sum(r['sum_value'] for r in results if 'sum_value' in r)
|
||||||
|
self.assertIsInstance(results, list)
|
||||||
|
self.assertGreater(len(results), 0, 'Aggregation sollte Ergebnisse liefern')
|
||||||
|
# Erwarteter Summenwert aus Testbeschreibung
|
||||||
|
self.assertEqual(total_value, 496, 'Gesamtsumme der clocksource-Events sollte 496 sein')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
Loading…
Reference in a new issue