commit 1ce40ced7ec06cdf435af7a2f4204222df7c9cde Author: Mika Date: Mon Feb 16 15:27:08 2026 +0000 Add mess_log_processing/src/mess_log_processing/core.py diff --git a/mess_log_processing/src/mess_log_processing/core.py b/mess_log_processing/src/mess_log_processing/core.py new file mode 100644 index 0000000..752fe91 --- /dev/null +++ b/mess_log_processing/src/mess_log_processing/core.py @@ -0,0 +1,118 @@ +from __future__ import annotations +import json +import pandas as pd +import logging +from pathlib import Path +from dataclasses import dataclass +from typing import Any, List + + +# Configure module-level logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +@dataclass +class LogData: + """Data model representing a single measurement log entry.""" + t_publish: float + t_gate_read: float + t_index_visible: float + pinned_flag: bool + timeouts: int + drift_signature: str + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> LogData: + required_fields = {f.name for f in cls.__dataclass_fields__.values()} + missing = required_fields - data.keys() + if missing: + raise ValueError(f"Missing fields {missing} in LogData input") + return cls( + t_publish=float(data["t_publish"]), + t_gate_read=float(data["t_gate_read"]), + t_index_visible=float(data["t_index_visible"]), + pinned_flag=bool(data["pinned_flag"]), + timeouts=int(data["timeouts"]), + drift_signature=str(data["drift_signature"]), + ) + + +def process_logs(json_file: str, csv_file: str) -> pd.DataFrame: + """Reads JSONL and CSV measurement logs, merges them, and computes aggregated statistics. + + Args: + json_file (str): Path to JSONL log file with raw measurements. + csv_file (str): Path to CSV overview file. + + Returns: + pd.DataFrame: Aggregated statistics with p50, p95, p99, and max per stratum. + """ + json_path = Path(json_file) + csv_path = Path(csv_file) + + if not json_path.exists(): + raise FileNotFoundError(f"JSON log file not found: {json_file}") + if not csv_path.exists(): + raise FileNotFoundError(f"CSV file not found: {csv_file}") + + logger.info("Reading JSONL log file: %s", json_file) + json_records: List[dict[str, Any]] = [] + with open(json_path, 'r', encoding='utf-8') as f: + for line_no, line in enumerate(f, start=1): + if not line.strip(): + continue + try: + record_raw = json.loads(line) + record = LogData.from_dict(record_raw) + json_records.append(record.__dict__) + except Exception as e: + logger.warning("Skipping invalid JSON line %d: %s", line_no, e) + + logger.info("Reading CSV summary file: %s", csv_file) + csv_df = pd.read_csv(csv_path) + + # Validate CSV fields + expected_cols = {f.name for f in LogData.__dataclass_fields__.values()} + missing_cols = expected_cols - set(csv_df.columns) + if missing_cols: + raise ValueError(f"CSV file is missing required columns: {missing_cols}") + + json_df = pd.DataFrame(json_records) + combined_df = pd.concat([json_df, csv_df], ignore_index=True) + + logger.info("Computing latency metrics and aggregations") + combined_df['latency_publish_gate'] = combined_df['t_gate_read'] - combined_df['t_publish'] + combined_df['latency_gate_index'] = combined_df['t_index_visible'] - combined_df['t_gate_read'] + + def _aggregate(group: pd.DataFrame) -> pd.Series: + def _quantiles(series: pd.Series) -> dict[str, float]: + return { + 'p50': series.quantile(0.5), + 'p95': series.quantile(0.95), + 'p99': series.quantile(0.99), + 'max': series.max(), + } + + pub_gate = _quantiles(group['latency_publish_gate']) + gate_index = _quantiles(group['latency_gate_index']) + + return pd.Series({ + 'publish_gate_p50': pub_gate['p50'], + 'publish_gate_p95': pub_gate['p95'], + 'publish_gate_p99': pub_gate['p99'], + 'publish_gate_max': pub_gate['max'], + 'gate_index_p50': gate_index['p50'], + 'gate_index_p95': gate_index['p95'], + 'gate_index_p99': gate_index['p99'], + 'gate_index_max': gate_index['max'], + 'timeouts_sum': group['timeouts'].sum(), + 'record_count': len(group) + }) + + aggregated_df = combined_df.groupby('pinned_flag', as_index=False).apply(_aggregate) + + assert not aggregated_df.empty, "Aggregated DataFrame should not be empty" + + logger.info("Aggregation complete. Rows: %d", len(aggregated_df)) + return aggregated_df.reset_index(drop=True)