Add spike_classifier/src/spike_classifier/core.py

This commit is contained in:
Mika 2026-01-12 11:39:21 +00:00
parent d475b16d77
commit 272302699a

View file

@ -0,0 +1,73 @@
import json
from datetime import datetime
from statistics import mean
def classify_spikes(json_data: dict) -> list[dict]:
"""Analysiert und klassifiziert CPU-Spike-Events.
Args:
json_data (dict): JSON-Daten mit Spike-Events.
Returns:
list[dict]: Liste klassifizierter Spikes mit Kennwerten.
"""
if not isinstance(json_data, dict) or 'spikes' not in json_data:
raise ValueError('Invalid input JSON: expected dict with key "spikes"')
classified = []
spikes = json_data.get('spikes', [])
if not isinstance(spikes, list):
raise ValueError('Invalid spikes format: expected list')
reorder_scores = []
for s in spikes:
score = s.get('reorder_score', 0.0)
if isinstance(score, (int, float)):
reorder_scores.append(score)
avg_score = mean(reorder_scores) if reorder_scores else 0.0
for event in spikes:
spike_id = str(event.get('spike_id', 'unknown'))
cpu_path = event.get('cpu_path', '')
reorder_score = float(event.get('reorder_score', 0.0))
timestamp_raw = event.get('timestamp')
signature = event.get('signature', '')
# Timestamps validieren und konvertieren
if isinstance(timestamp_raw, str):
try:
timestamp = datetime.fromisoformat(timestamp_raw)
except ValueError:
timestamp = datetime.utcnow()
elif isinstance(timestamp_raw, datetime):
timestamp = timestamp_raw
else:
timestamp = datetime.utcnow()
# Einfache Klassifikationslogik
if reorder_score > avg_score * 1.2:
classification = 'high_reorder'
elif reorder_score < avg_score * 0.8:
classification = 'low_reorder'
else:
classification = 'normal'
# Signaturanalyse (rudimentär)
if not signature and isinstance(cpu_path, str):
if ',' in cpu_path:
signature = 'multi_cpu'
else:
signature = 'single_cpu'
classified.append({
'spike_id': spike_id,
'cpu_path': cpu_path,
'reorder_score': reorder_score,
'timestamp': timestamp.isoformat(),
'signature': signature,
'classification': classification
})
return classified