Add artifact.1/src/artifact_1/core.py

This commit is contained in:
Mika 2026-03-31 13:46:38 +00:00
commit 401970b448

View file

@ -0,0 +1,112 @@
import argparse
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
import pandas as pd
# Configure logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s')
@dataclass
class LogData:
timestamp: datetime
aux_worker: int
p99_tail: float
band_width: float
@staticmethod
def validate(entry: Dict[str, Any]) -> bool:
required_fields = ["timestamp", "aux_worker", "p99_tail", "band_width"]
for field in required_fields:
if field not in entry:
raise ValueError(f"Missing required field: {field}")
if not isinstance(entry["aux_worker"], int):
raise TypeError("Field 'aux_worker' must be int")
if not isinstance(entry["p99_tail"], (int, float)):
raise TypeError("Field 'p99_tail' must be numeric")
if not isinstance(entry["band_width"], (int, float)):
raise TypeError("Field 'band_width' must be numeric")
try:
datetime.fromisoformat(entry["timestamp"].replace('Z', '+00:00'))
except Exception as e:
raise ValueError(f"Invalid timestamp format: {entry['timestamp']} ({e})")
return True
def analyze_logs(log_file_path: str) -> Dict[str, Any]:
"""Analysiert Log-Dateien verschiedener aux-Worker-Konfigurationen.
Args:
log_file_path (str): Pfad zur JSON-Log-Datei.
Returns:
Dict[str, Any]: Aggregierte Ergebnisse pro aux_worker (Median, IQR usw.)
"""
log_path = Path(log_file_path)
if not log_path.exists():
raise FileNotFoundError(f"Log file not found: {log_file_path}")
logging.info(f"Reading log file from {log_file_path}")
try:
df = pd.read_json(log_path)
except ValueError as e:
raise ValueError(f"Invalid JSON format in {log_file_path}: {e}")
expected_cols = {"timestamp", "aux_worker", "p99_tail", "band_width"}
if not expected_cols.issubset(df.columns):
missing = expected_cols - set(df.columns)
raise ValueError(f"Missing columns in log data: {missing}")
# Validate each row explicitly
for _, row in df.iterrows():
LogData.validate(row.to_dict())
# Compute aggregates
logging.info("Computing median and IQR per aux_worker")
result = {}
grouped = df.groupby("aux_worker")
for aux, group in grouped:
summary = {}
for metric in ["p99_tail", "band_width"]:
median_val = float(group[metric].median())
q75, q25 = group[metric].quantile([0.75, 0.25])
iqr_val = float(q75 - q25)
summary[metric] = {"median": median_val, "iqr": iqr_val}
result[int(aux)] = summary
logging.info("Analysis complete.")
return result
def _save_output(results: Dict[str, Any], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2)
logging.info(f"Analysis summary written to {output_path}")
def main() -> None:
parser = argparse.ArgumentParser(description="Analyse von Aux-Worker-Logdaten.")
parser.add_argument("--log-file", required=True, help="Pfad zur Log-Datei (JSON-Format)")
parser.add_argument(
"--output",
required=False,
default="output/analysis_summary.json",
help="Pfad zur Ausgabedatei (JSON)",
)
args = parser.parse_args()
results = analyze_logs(args.log_file)
output_path = Path(args.output)
_save_output(results, output_path)
if __name__ == "__main__":
main()