Add log_analysis/src/log_analysis/core.py

This commit is contained in:
Mika 2026-02-23 14:48:36 +00:00
parent 5bc7ab5461
commit e8acae0be4

View file

@ -0,0 +1,120 @@
from __future__ import annotations
import pandas as pd
import json
import logging
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, Any
from statistics import mean
# Configure Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@dataclass
class LogEntry:
timestamp: str
run_id: str
policy_hash: str
warn_rate: float
unknown_rate: float
t_index_visible: float
t_gate_read: float
pinned_status: str
@dataclass
class AnalysisResult:
policy_hash: str
warn_rate: float
unknown_rate: float
timing_analysis: str
class LogAnalysisError(Exception):
"""Custom exception for log analysis errors."""
pass
def _read_log_file(file_path: str) -> pd.DataFrame:
path = Path(file_path)
if not path.exists():
raise LogAnalysisError(f"Log file not found: {file_path}")
if path.suffix.lower() == '.csv':
df = pd.read_csv(file_path)
elif path.suffix.lower() == '.json':
df = pd.read_json(file_path)
else:
raise LogAnalysisError(f"Unsupported file format: {path.suffix}")
# Validate required columns
required_cols = {
'timestamp', 'run_id', 'policy_hash', 'warn_rate', 'unknown_rate',
't_index_visible', 't_gate_read', 'pinned_status'
}
missing = required_cols - set(df.columns)
if missing:
raise LogAnalysisError(f"Missing columns in {file_path}: {missing}")
logger.info(f"Loaded log file {file_path} with {len(df)} entries.")
return df
def analyze_logs(log_file: str, comparison_run: str) -> Dict[str, Any]:
"""Analysiert zwei Log-Runs, vergleicht Kennzahlen und erkennt Drift.
Args:
log_file (str): Pfad zur Logdatei des Vergleichslaufs
comparison_run (str): Pfad zur Logdatei des Basislaufs
Returns:
dict: Analyseergebnisse als Dictionary
"""
logger.info("Starting drift analysis between runs.")
df_new = _read_log_file(log_file)
df_base = _read_log_file(comparison_run)
# Compute metrics
def _compute_means(df: pd.DataFrame) -> Dict[str, float]:
return {
'warn_rate': float(mean(df['warn_rate'])),
'unknown_rate': float(mean(df['unknown_rate'])),
't_index_visible_mean': float(mean(df['t_index_visible'])),
't_gate_read_mean': float(mean(df['t_gate_read'])),
}
stats_new = _compute_means(df_new)
stats_base = _compute_means(df_base)
policy_hash_new = df_new['policy_hash'].mode()[0] if not df_new.empty else 'N/A'
policy_hash_base = df_base['policy_hash'].mode()[0] if not df_base.empty else 'N/A'
timing_delta_index = stats_new['t_index_visible_mean'] - stats_base['t_index_visible_mean']
timing_delta_gate = stats_new['t_gate_read_mean'] - stats_base['t_gate_read_mean']
timing_analysis = (
f"Δt_index_visible={timing_delta_index:.6f}, Δt_gate_read={timing_delta_gate:.6f}"
)
result = AnalysisResult(
policy_hash=f"{policy_hash_base} -> {policy_hash_new}",
warn_rate=stats_new['warn_rate'] - stats_base['warn_rate'],
unknown_rate=stats_new['unknown_rate'] - stats_base['unknown_rate'],
timing_analysis=timing_analysis
)
# CI readiness: sanity checks
assert isinstance(result.warn_rate, float)
assert isinstance(result.unknown_rate, float)
assert isinstance(result.policy_hash, str)
logger.info("Drift analysis completed successfully.")
return {
'policy_hash': result.policy_hash,
'warn_rate': result.warn_rate,
'unknown_rate': result.unknown_rate,
'timing_analysis': result.timing_analysis
}