From 0ce3032a87458eb67beb3ea0a806262d8d78eb61 Mon Sep 17 00:00:00 2001 From: Mika Date: Mon, 2 Mar 2026 14:26:30 +0000 Subject: [PATCH] Add artifact-1/src/artifact_1/main.py --- artifact-1/src/artifact_1/main.py | 92 +++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 artifact-1/src/artifact_1/main.py diff --git a/artifact-1/src/artifact_1/main.py b/artifact-1/src/artifact_1/main.py new file mode 100644 index 0000000..1e4baaa --- /dev/null +++ b/artifact-1/src/artifact_1/main.py @@ -0,0 +1,92 @@ +import argparse +import json +from pathlib import Path +from typing import Any, Dict, List +import pandas as pd +from dataclasses import dataclass + + +@dataclass +class TimingData: + """Data model for timing log entries.""" + expires_at_dist_hours: float + delta_t: float + pinned: bool + unpinned: bool + + @staticmethod + def from_dict(item: Dict[str, Any]) -> 'TimingData': + required_fields = ['expires_at_dist_hours', 'delta_t', 'pinned', 'unpinned'] + for f in required_fields: + if f not in item: + raise ValueError(f"Missing required field: {f}") + return TimingData( + expires_at_dist_hours=float(item['expires_at_dist_hours']), + delta_t=float(item['delta_t']), + pinned=bool(item['pinned']), + unpinned=bool(item['unpinned']) + ) + + +def analyze_timing_data(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyse timing log data, calculate Δt stats, identify anomalies and near-expiry cases.""" + if not isinstance(data, list): + raise TypeError("Input data must be a list of dictionaries.") + + parsed = [TimingData.from_dict(d) for d in data] + df = pd.DataFrame([d.__dict__ for d in parsed]) + + # Validation: ensure numeric columns. + for col in ['expires_at_dist_hours', 'delta_t']: + if not pd.api.types.is_numeric_dtype(df[col]): + raise ValueError(f"Column {col} must be numeric.") + + summary = {} + + # Negative Δt detection + negative_dt = df[df['delta_t'] < 0] + summary['negative_dt_count'] = len(negative_dt) + summary['negative_dt_percentage'] = float(len(negative_dt)) / len(df) * 100 if len(df) > 0 else 0.0 + + # Aggregations by pinned/unpinned + group_stats = df.groupby(['pinned', 'unpinned'])['delta_t'].agg(['mean', 'std', 'count']).reset_index() + summary['group_stats'] = group_stats.to_dict(orient='records') + + # Near-expiry threshold recommendation (heuristic) + near_expiry_df = df[df['expires_at_dist_hours'] < 48] + share_below_24 = (near_expiry_df['expires_at_dist_hours'] < 24).mean() if len(near_expiry_df) > 0 else 0.0 + threshold = 24 if share_below_24 > 0.5 else 48 + summary['recommended_near_expiry_threshold_hours'] = threshold + + # Overall stats + summary['total_records'] = len(df) + summary['mean_delta_t'] = df['delta_t'].mean() + summary['std_delta_t'] = df['delta_t'].std() + + return summary + + +def main() -> None: + parser = argparse.ArgumentParser(description='Analyze timing JSON data and produce summary.') + parser.add_argument('--input', required=True, help='Pfad zur JSON-Eingabedatei mit Timing-Daten') + parser.add_argument('--output', required=True, help='Pfad zur JSON-Datei mit Analyseergebnissen') + args = parser.parse_args() + + input_path = Path(args.input) + output_path = Path(args.output) + + if not input_path.exists(): + raise FileNotFoundError(f"Input file not found: {input_path}") + + with input_path.open('r', encoding='utf-8') as f: + data = json.load(f) + + result = analyze_timing_data(data) + + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open('w', encoding='utf-8') as f: + json.dump(result, f, indent=2, ensure_ascii=False) + + +if __name__ == '__main__': + main() \ No newline at end of file