From e8acae0be40a6032cc7a197a7ed124840014d695 Mon Sep 17 00:00:00 2001 From: Mika Date: Mon, 23 Feb 2026 14:48:36 +0000 Subject: [PATCH] Add log_analysis/src/log_analysis/core.py --- log_analysis/src/log_analysis/core.py | 120 ++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 log_analysis/src/log_analysis/core.py diff --git a/log_analysis/src/log_analysis/core.py b/log_analysis/src/log_analysis/core.py new file mode 100644 index 0000000..9594807 --- /dev/null +++ b/log_analysis/src/log_analysis/core.py @@ -0,0 +1,120 @@ +from __future__ import annotations +import pandas as pd +import json +import logging +from pathlib import Path +from dataclasses import dataclass +from typing import Dict, Any +from statistics import mean + + +# Configure Logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +@dataclass +class LogEntry: + timestamp: str + run_id: str + policy_hash: str + warn_rate: float + unknown_rate: float + t_index_visible: float + t_gate_read: float + pinned_status: str + + +@dataclass +class AnalysisResult: + policy_hash: str + warn_rate: float + unknown_rate: float + timing_analysis: str + + +class LogAnalysisError(Exception): + """Custom exception for log analysis errors.""" + pass + + +def _read_log_file(file_path: str) -> pd.DataFrame: + path = Path(file_path) + if not path.exists(): + raise LogAnalysisError(f"Log file not found: {file_path}") + if path.suffix.lower() == '.csv': + df = pd.read_csv(file_path) + elif path.suffix.lower() == '.json': + df = pd.read_json(file_path) + else: + raise LogAnalysisError(f"Unsupported file format: {path.suffix}") + + # Validate required columns + required_cols = { + 'timestamp', 'run_id', 'policy_hash', 'warn_rate', 'unknown_rate', + 't_index_visible', 't_gate_read', 'pinned_status' + } + missing = required_cols - set(df.columns) + if missing: + raise LogAnalysisError(f"Missing columns in {file_path}: {missing}") + + logger.info(f"Loaded log file {file_path} with {len(df)} entries.") + return df + + +def analyze_logs(log_file: str, comparison_run: str) -> Dict[str, Any]: + """Analysiert zwei Log-Runs, vergleicht Kennzahlen und erkennt Drift. + + Args: + log_file (str): Pfad zur Logdatei des Vergleichslaufs + comparison_run (str): Pfad zur Logdatei des Basislaufs + + Returns: + dict: Analyseergebnisse als Dictionary + """ + logger.info("Starting drift analysis between runs.") + + df_new = _read_log_file(log_file) + df_base = _read_log_file(comparison_run) + + # Compute metrics + def _compute_means(df: pd.DataFrame) -> Dict[str, float]: + return { + 'warn_rate': float(mean(df['warn_rate'])), + 'unknown_rate': float(mean(df['unknown_rate'])), + 't_index_visible_mean': float(mean(df['t_index_visible'])), + 't_gate_read_mean': float(mean(df['t_gate_read'])), + } + + stats_new = _compute_means(df_new) + stats_base = _compute_means(df_base) + + policy_hash_new = df_new['policy_hash'].mode()[0] if not df_new.empty else 'N/A' + policy_hash_base = df_base['policy_hash'].mode()[0] if not df_base.empty else 'N/A' + + timing_delta_index = stats_new['t_index_visible_mean'] - stats_base['t_index_visible_mean'] + timing_delta_gate = stats_new['t_gate_read_mean'] - stats_base['t_gate_read_mean'] + + timing_analysis = ( + f"Δt_index_visible={timing_delta_index:.6f}, Δt_gate_read={timing_delta_gate:.6f}" + ) + + result = AnalysisResult( + policy_hash=f"{policy_hash_base} -> {policy_hash_new}", + warn_rate=stats_new['warn_rate'] - stats_base['warn_rate'], + unknown_rate=stats_new['unknown_rate'] - stats_base['unknown_rate'], + timing_analysis=timing_analysis + ) + + # CI readiness: sanity checks + assert isinstance(result.warn_rate, float) + assert isinstance(result.unknown_rate, float) + assert isinstance(result.policy_hash, str) + + logger.info("Drift analysis completed successfully.") + return { + 'policy_hash': result.policy_hash, + 'warn_rate': result.warn_rate, + 'unknown_rate': result.unknown_rate, + 'timing_analysis': result.timing_analysis + } \ No newline at end of file