Add artifact-1/src/artifact_1/main.py

This commit is contained in:
Mika 2026-03-02 14:26:30 +00:00
commit 0ce3032a87

View file

@ -0,0 +1,92 @@
import argparse
import json
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
from dataclasses import dataclass
@dataclass
class TimingData:
"""Data model for timing log entries."""
expires_at_dist_hours: float
delta_t: float
pinned: bool
unpinned: bool
@staticmethod
def from_dict(item: Dict[str, Any]) -> 'TimingData':
required_fields = ['expires_at_dist_hours', 'delta_t', 'pinned', 'unpinned']
for f in required_fields:
if f not in item:
raise ValueError(f"Missing required field: {f}")
return TimingData(
expires_at_dist_hours=float(item['expires_at_dist_hours']),
delta_t=float(item['delta_t']),
pinned=bool(item['pinned']),
unpinned=bool(item['unpinned'])
)
def analyze_timing_data(data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyse timing log data, calculate Δt stats, identify anomalies and near-expiry cases."""
if not isinstance(data, list):
raise TypeError("Input data must be a list of dictionaries.")
parsed = [TimingData.from_dict(d) for d in data]
df = pd.DataFrame([d.__dict__ for d in parsed])
# Validation: ensure numeric columns.
for col in ['expires_at_dist_hours', 'delta_t']:
if not pd.api.types.is_numeric_dtype(df[col]):
raise ValueError(f"Column {col} must be numeric.")
summary = {}
# Negative Δt detection
negative_dt = df[df['delta_t'] < 0]
summary['negative_dt_count'] = len(negative_dt)
summary['negative_dt_percentage'] = float(len(negative_dt)) / len(df) * 100 if len(df) > 0 else 0.0
# Aggregations by pinned/unpinned
group_stats = df.groupby(['pinned', 'unpinned'])['delta_t'].agg(['mean', 'std', 'count']).reset_index()
summary['group_stats'] = group_stats.to_dict(orient='records')
# Near-expiry threshold recommendation (heuristic)
near_expiry_df = df[df['expires_at_dist_hours'] < 48]
share_below_24 = (near_expiry_df['expires_at_dist_hours'] < 24).mean() if len(near_expiry_df) > 0 else 0.0
threshold = 24 if share_below_24 > 0.5 else 48
summary['recommended_near_expiry_threshold_hours'] = threshold
# Overall stats
summary['total_records'] = len(df)
summary['mean_delta_t'] = df['delta_t'].mean()
summary['std_delta_t'] = df['delta_t'].std()
return summary
def main() -> None:
parser = argparse.ArgumentParser(description='Analyze timing JSON data and produce summary.')
parser.add_argument('--input', required=True, help='Pfad zur JSON-Eingabedatei mit Timing-Daten')
parser.add_argument('--output', required=True, help='Pfad zur JSON-Datei mit Analyseergebnissen')
args = parser.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_path}")
with input_path.open('r', encoding='utf-8') as f:
data = json.load(f)
result = analyze_timing_data(data)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
if __name__ == '__main__':
main()