Add mess_log_processing/src/mess_log_processing/core.py
This commit is contained in:
commit
1ce40ced7e
1 changed files with 118 additions and 0 deletions
118
mess_log_processing/src/mess_log_processing/core.py
Normal file
118
mess_log_processing/src/mess_log_processing/core.py
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
from __future__ import annotations
|
||||
import json
|
||||
import pandas as pd
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, List
|
||||
|
||||
|
||||
# Configure module-level logger
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogData:
|
||||
"""Data model representing a single measurement log entry."""
|
||||
t_publish: float
|
||||
t_gate_read: float
|
||||
t_index_visible: float
|
||||
pinned_flag: bool
|
||||
timeouts: int
|
||||
drift_signature: str
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> LogData:
|
||||
required_fields = {f.name for f in cls.__dataclass_fields__.values()}
|
||||
missing = required_fields - data.keys()
|
||||
if missing:
|
||||
raise ValueError(f"Missing fields {missing} in LogData input")
|
||||
return cls(
|
||||
t_publish=float(data["t_publish"]),
|
||||
t_gate_read=float(data["t_gate_read"]),
|
||||
t_index_visible=float(data["t_index_visible"]),
|
||||
pinned_flag=bool(data["pinned_flag"]),
|
||||
timeouts=int(data["timeouts"]),
|
||||
drift_signature=str(data["drift_signature"]),
|
||||
)
|
||||
|
||||
|
||||
def process_logs(json_file: str, csv_file: str) -> pd.DataFrame:
|
||||
"""Reads JSONL and CSV measurement logs, merges them, and computes aggregated statistics.
|
||||
|
||||
Args:
|
||||
json_file (str): Path to JSONL log file with raw measurements.
|
||||
csv_file (str): Path to CSV overview file.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: Aggregated statistics with p50, p95, p99, and max per stratum.
|
||||
"""
|
||||
json_path = Path(json_file)
|
||||
csv_path = Path(csv_file)
|
||||
|
||||
if not json_path.exists():
|
||||
raise FileNotFoundError(f"JSON log file not found: {json_file}")
|
||||
if not csv_path.exists():
|
||||
raise FileNotFoundError(f"CSV file not found: {csv_file}")
|
||||
|
||||
logger.info("Reading JSONL log file: %s", json_file)
|
||||
json_records: List[dict[str, Any]] = []
|
||||
with open(json_path, 'r', encoding='utf-8') as f:
|
||||
for line_no, line in enumerate(f, start=1):
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
record_raw = json.loads(line)
|
||||
record = LogData.from_dict(record_raw)
|
||||
json_records.append(record.__dict__)
|
||||
except Exception as e:
|
||||
logger.warning("Skipping invalid JSON line %d: %s", line_no, e)
|
||||
|
||||
logger.info("Reading CSV summary file: %s", csv_file)
|
||||
csv_df = pd.read_csv(csv_path)
|
||||
|
||||
# Validate CSV fields
|
||||
expected_cols = {f.name for f in LogData.__dataclass_fields__.values()}
|
||||
missing_cols = expected_cols - set(csv_df.columns)
|
||||
if missing_cols:
|
||||
raise ValueError(f"CSV file is missing required columns: {missing_cols}")
|
||||
|
||||
json_df = pd.DataFrame(json_records)
|
||||
combined_df = pd.concat([json_df, csv_df], ignore_index=True)
|
||||
|
||||
logger.info("Computing latency metrics and aggregations")
|
||||
combined_df['latency_publish_gate'] = combined_df['t_gate_read'] - combined_df['t_publish']
|
||||
combined_df['latency_gate_index'] = combined_df['t_index_visible'] - combined_df['t_gate_read']
|
||||
|
||||
def _aggregate(group: pd.DataFrame) -> pd.Series:
|
||||
def _quantiles(series: pd.Series) -> dict[str, float]:
|
||||
return {
|
||||
'p50': series.quantile(0.5),
|
||||
'p95': series.quantile(0.95),
|
||||
'p99': series.quantile(0.99),
|
||||
'max': series.max(),
|
||||
}
|
||||
|
||||
pub_gate = _quantiles(group['latency_publish_gate'])
|
||||
gate_index = _quantiles(group['latency_gate_index'])
|
||||
|
||||
return pd.Series({
|
||||
'publish_gate_p50': pub_gate['p50'],
|
||||
'publish_gate_p95': pub_gate['p95'],
|
||||
'publish_gate_p99': pub_gate['p99'],
|
||||
'publish_gate_max': pub_gate['max'],
|
||||
'gate_index_p50': gate_index['p50'],
|
||||
'gate_index_p95': gate_index['p95'],
|
||||
'gate_index_p99': gate_index['p99'],
|
||||
'gate_index_max': gate_index['max'],
|
||||
'timeouts_sum': group['timeouts'].sum(),
|
||||
'record_count': len(group)
|
||||
})
|
||||
|
||||
aggregated_df = combined_df.groupby('pinned_flag', as_index=False).apply(_aggregate)
|
||||
|
||||
assert not aggregated_df.empty, "Aggregated DataFrame should not be empty"
|
||||
|
||||
logger.info("Aggregation complete. Rows: %d", len(aggregated_df))
|
||||
return aggregated_df.reset_index(drop=True)
|
||||
Loading…
Reference in a new issue