Add log_analysis/src/log_analysis/cli.py

This commit is contained in:
Mika 2026-02-23 14:48:36 +00:00
parent e8acae0be4
commit 2d2163d869

View file

@ -0,0 +1,70 @@
import argparse
import json
import os
from pathlib import Path
import logging
from typing import Any
from log_analysis.core import analyze_logs
def _setup_logging() -> None:
"""Initialisiert konsistentes Logging für CLI-Nutzung."""
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def _validate_input_file(path_str: str) -> Path:
"""Validiert, dass die angegebene Datei existiert und lesbar ist."""
path = Path(path_str)
if not path.exists():
raise FileNotFoundError(f"Datei nicht gefunden: {path}")
if not path.is_file():
raise ValueError(f"Pfad ist keine Datei: {path}")
return path
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description='Log Drift Analysis CLI')
parser.add_argument('--log-file', required=True, help='Pfad zur Logdatei des neuen Runs')
parser.add_argument('--comparison-run', required=True, help='Pfad zur Logdatei des Vergleichs-Runs')
parser.add_argument('--output', required=False, default='output/analysis_results.json',
help='Pfad zum Ausgabe-JSON (Standard: output/analysis_results.json)')
return parser.parse_args()
def main() -> None:
"""CLI-Einstiegspunkt für Log Drift Analysis."""
_setup_logging()
args = _parse_args()
try:
log_file = _validate_input_file(args.log_file)
comparison_run = _validate_input_file(args.comparison_run)
output_path = Path(args.output)
logging.info(f'Starte Analyse: {log_file} vs {comparison_run}')
results: dict[str, Any] = analyze_logs(str(log_file), str(comparison_run))
# Validierung der Rückgabe
assert isinstance(results, dict), 'Analysefunktion muss ein Dictionary zurückgeben'
for key in ('policy_hash', 'warn_rate', 'unknown_rate', 'timing_analysis'):
if key not in results:
raise KeyError(f'Fehlender Schlüssel im Analyseergebnis: {key}')
# Schreibvorgang
os.makedirs(output_path.parent, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
logging.info(f'Analyse abgeschlossen. Ergebnisse gespeichert unter: {output_path}')
except Exception as e:
logging.error(f'Fehler bei der Log-Analyse: {e}')
raise
if __name__ == '__main__':
main()