Add artifact1/src/artifact1/core.py
This commit is contained in:
commit
4b8fe96731
1 changed files with 101 additions and 0 deletions
101
artifact1/src/artifact1/core.py
Normal file
101
artifact1/src/artifact1/core.py
Normal file
|
|
@ -0,0 +1,101 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Dict
|
||||||
|
from statistics import mean, stdev
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
|
class MetricsValidationError(Exception):
|
||||||
|
"""Custom Exception for invalid performance metrics input data."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PerformanceData:
|
||||||
|
max_only_alerts: int
|
||||||
|
outlier_frequency: float
|
||||||
|
expires_at_dist_hours: float
|
||||||
|
retry_total_overhead: float
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, data: Dict[str, Any]) -> PerformanceData:
|
||||||
|
required_fields = {
|
||||||
|
'max_only_alerts': int,
|
||||||
|
'outlier_frequency': float,
|
||||||
|
'expires_at_dist_hours': float,
|
||||||
|
'retry_total_overhead': float,
|
||||||
|
}
|
||||||
|
for field, t in required_fields.items():
|
||||||
|
if field not in data:
|
||||||
|
raise MetricsValidationError(f"Missing required field: {field}")
|
||||||
|
try:
|
||||||
|
_ = t(data[field])
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
raise MetricsValidationError(f"Invalid type for field {field}: expected {t.__name__}, got {type(data[field]).__name__}")
|
||||||
|
return cls(**{f: data[f] for f in required_fields})
|
||||||
|
|
||||||
|
|
||||||
|
def analyze_metrics(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Analysiert Performance-Daten und erstellt aggregierte Analyse-Ergebnisse.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Eingehende JSON-Daten, die Performance-Metriken enthalten.
|
||||||
|
Returns:
|
||||||
|
Dict mit durchschnittlichen und statistischen Kennzahlen.
|
||||||
|
"""
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
raise MetricsValidationError("Input data must be a dictionary.")
|
||||||
|
|
||||||
|
metrics = data.get('runs') or data.get('metrics') or data
|
||||||
|
|
||||||
|
if not isinstance(metrics, list):
|
||||||
|
raise MetricsValidationError("Expected a list of metric entries under 'runs' or 'metrics'.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
validated_records = [PerformanceData.from_dict(entry).__dict__ for entry in metrics]
|
||||||
|
except MetricsValidationError as e:
|
||||||
|
logger.error(f"Failed to validate input data: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
if not validated_records:
|
||||||
|
return {'status': 'empty', 'aggregates': {}, 'anomalies': []}
|
||||||
|
|
||||||
|
df = pd.DataFrame(validated_records)
|
||||||
|
|
||||||
|
result: Dict[str, Any] = {'status': 'ok', 'aggregates': {}, 'anomalies': []}
|
||||||
|
|
||||||
|
for column in df.columns:
|
||||||
|
col_data = df[column].dropna()
|
||||||
|
if not col_data.empty:
|
||||||
|
result['aggregates'][column] = {
|
||||||
|
'mean': float(col_data.mean()),
|
||||||
|
'std': float(col_data.std(ddof=0)),
|
||||||
|
'min': float(col_data.min()),
|
||||||
|
'max': float(col_data.max()),
|
||||||
|
}
|
||||||
|
|
||||||
|
anomalies = []
|
||||||
|
for col, stats in result['aggregates'].items():
|
||||||
|
mu, sigma = stats['mean'], stats['std']
|
||||||
|
upper_limit = mu + 3 * sigma if sigma > 0 else mu * 1.3
|
||||||
|
lower_limit = mu - 3 * sigma if sigma > 0 else mu * 0.7
|
||||||
|
outliers = df[(df[col] > upper_limit) | (df[col] < lower_limit)]
|
||||||
|
if not outliers.empty:
|
||||||
|
anomalies.append({
|
||||||
|
'metric': col,
|
||||||
|
'count': int(len(outliers)),
|
||||||
|
'thresholds': {
|
||||||
|
'lower': lower_limit,
|
||||||
|
'upper': upper_limit
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
result['anomalies'] = anomalies
|
||||||
|
|
||||||
|
assert 'aggregates' in result and isinstance(result['aggregates'], dict), "Aggregates missing in result"
|
||||||
|
return result
|
||||||
Loading…
Reference in a new issue