Add grouped_run_statistics/src/grouped_run_statistics/core.py
This commit is contained in:
parent
bf91d71cbc
commit
69fde90353
1 changed files with 94 additions and 0 deletions
94
grouped_run_statistics/src/grouped_run_statistics/core.py
Normal file
94
grouped_run_statistics/src/grouped_run_statistics/core.py
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections import Counter
|
||||
from typing import List, Dict, Any
|
||||
|
||||
__all__ = ["group_runs"]
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
if not logger.handlers:
|
||||
handler = logging.StreamHandler()
|
||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
class InvalidRunRecordError(ValueError):
|
||||
"""Custom exception raised when run records are invalid."""
|
||||
|
||||
|
||||
class GroupedStatistics(Dict[str, int]):
|
||||
"""Represents aggregated statistics grouped by pinned/unpinned and result status."""
|
||||
|
||||
|
||||
REQUIRED_FIELDS = {"id", "pinned", "status"}
|
||||
VALID_STATUSES = {"PASS", "WARN", "FAIL"}
|
||||
|
||||
|
||||
def group_runs(run_data: List[Dict[str, Any]]) -> Dict[str, int]:
|
||||
"""
|
||||
Gruppiert Testlaufdaten in 'pinned' und 'unpinned' Gruppen und zählt PASS/WARN/FAIL.
|
||||
|
||||
Args:
|
||||
run_data: Liste von Dictionaries mit Schlüsseln 'id', 'pinned' (bool), 'status' (str).
|
||||
|
||||
Returns:
|
||||
Dict[str, int]: Aggregierte Statistik der Runs nach pinned/unpinned und Status.
|
||||
"""
|
||||
|
||||
# Input validation
|
||||
if not isinstance(run_data, list):
|
||||
raise InvalidRunRecordError("Input run_data must be a list of dictionaries.")
|
||||
|
||||
stats_counter = Counter(
|
||||
{
|
||||
"pinned_pass": 0,
|
||||
"unpinned_pass": 0,
|
||||
"pinned_warn": 0,
|
||||
"unpinned_warn": 0,
|
||||
"pinned_fail": 0,
|
||||
"unpinned_fail": 0,
|
||||
}
|
||||
)
|
||||
|
||||
for idx, record in enumerate(run_data):
|
||||
if not isinstance(record, dict):
|
||||
raise InvalidRunRecordError(f"Record at index {idx} is not a dictionary.")
|
||||
|
||||
if not REQUIRED_FIELDS.issubset(record.keys()):
|
||||
raise InvalidRunRecordError(
|
||||
f"Record at index {idx} missing required fields: {REQUIRED_FIELDS - record.keys()}"
|
||||
)
|
||||
|
||||
pinned = record["pinned"]
|
||||
status = record["status"].upper()
|
||||
|
||||
if not isinstance(pinned, bool):
|
||||
raise InvalidRunRecordError(f"Record at index {idx} has non-boolean 'pinned' field.")
|
||||
|
||||
if status not in VALID_STATUSES:
|
||||
raise InvalidRunRecordError(
|
||||
f"Record at index {idx} has invalid status '{status}'. Must be one of {VALID_STATUSES}."
|
||||
)
|
||||
|
||||
key_prefix = "pinned" if pinned else "unpinned"
|
||||
key = f"{key_prefix}_{status.lower()}"
|
||||
stats_counter[key] += 1
|
||||
|
||||
result: GroupedStatistics = GroupedStatistics(stats_counter)
|
||||
|
||||
logger.info(
|
||||
"Grouped %d runs into statistics: pinned_pass=%d, unpinned_pass=%d, pinned_warn=%d, unpinned_warn=%d, pinned_fail=%d, unpinned_fail=%d",
|
||||
len(run_data),
|
||||
result["pinned_pass"],
|
||||
result["unpinned_pass"],
|
||||
result["pinned_warn"],
|
||||
result["unpinned_warn"],
|
||||
result["pinned_fail"],
|
||||
result["unpinned_fail"],
|
||||
)
|
||||
|
||||
return dict(result)
|
||||
Loading…
Reference in a new issue