Add 1_interferenz_metrics_parser/src/interferenz_metrics_parser/core.py
This commit is contained in:
parent
33dbdeca93
commit
377877040a
1 changed files with 76 additions and 0 deletions
|
|
@ -0,0 +1,76 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class InterferenceData:
|
||||||
|
"""Datenmodell für Interferenzmessdaten."""
|
||||||
|
|
||||||
|
run_id: str
|
||||||
|
retry_tail_p99: float
|
||||||
|
band_width: float
|
||||||
|
mix_ratio: float
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidInterferenceLogError(Exception):
|
||||||
|
"""Wird geworfen, wenn die Eingabe-Logdatei ungültige Daten enthält."""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def parse_interference_logs(log_file_path: str) -> List[InterferenceData]:
|
||||||
|
"""Parst eine JSON-Logdatei und gibt eine Liste von InterferenceData-Objekten zurück.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
log_file_path: Pfad zur JSON-Logdatei mit Interferenzdaten.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Liste von InterferenceData-Objekten.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: Wenn die Datei nicht existiert.
|
||||||
|
InvalidInterferenceLogError: Wenn Daten ungültig sind oder Pfad kein JSON ist.
|
||||||
|
"""
|
||||||
|
path = Path(log_file_path)
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(f"Log file not found: {path}")
|
||||||
|
|
||||||
|
if path.suffix.lower() != ".json":
|
||||||
|
raise InvalidInterferenceLogError(f"Unsupported file format: {path.suffix}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with path.open('r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise InvalidInterferenceLogError(f"Invalid JSON content: {e}") from e
|
||||||
|
|
||||||
|
if not isinstance(data, list):
|
||||||
|
raise InvalidInterferenceLogError("Expected top-level JSON array of measurements.")
|
||||||
|
|
||||||
|
parsed: List[InterferenceData] = []
|
||||||
|
for entry in data:
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
raise InvalidInterferenceLogError(f"Invalid entry type: {type(entry)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
run_id = str(entry["run_id"])
|
||||||
|
retry_tail_p99 = float(entry["retry_tail_p99"])
|
||||||
|
band_width = float(entry["band_width"])
|
||||||
|
mix_ratio = float(entry["mix_ratio"])
|
||||||
|
except (KeyError, ValueError, TypeError) as e:
|
||||||
|
raise InvalidInterferenceLogError(f"Missing or invalid field in entry: {entry}") from e
|
||||||
|
|
||||||
|
parsed.append(
|
||||||
|
InterferenceData(
|
||||||
|
run_id=run_id,
|
||||||
|
retry_tail_p99=retry_tail_p99,
|
||||||
|
band_width=band_width,
|
||||||
|
mix_ratio=mix_ratio,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert all(isinstance(p, InterferenceData) for p in parsed), "All parsed items must be InterferenceData instances."
|
||||||
|
return parsed
|
||||||
Loading…
Reference in a new issue