From 7c6880e2adfb2c38e31a8698d5ee88acb67b2df9 Mon Sep 17 00:00:00 2001 From: Mika Date: Wed, 17 Dec 2025 12:03:31 +0000 Subject: [PATCH] Add trace_analysis_script/src/trace_analysis_script/core.py --- .../src/trace_analysis_script/core.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 trace_analysis_script/src/trace_analysis_script/core.py diff --git a/trace_analysis_script/src/trace_analysis_script/core.py b/trace_analysis_script/src/trace_analysis_script/core.py new file mode 100644 index 0000000..1397c19 --- /dev/null +++ b/trace_analysis_script/src/trace_analysis_script/core.py @@ -0,0 +1,61 @@ +import json +import pandas as pd +import numpy as np +from pathlib import Path +from typing import Dict + +def analyze_trace(trace_file: str) -> Dict[str, float]: + """Analysiert eine Kernel-Trace-Datei und erstellt Statistiken zu Signal- und Zeitoffset-Eigenschaften. + + Parameters + ---------- + trace_file : str + Pfad zur Kernel-Trace-Datei im CSV- oder JSON-Format. + + Returns + ------- + dict + Dictionary mit statistischen Kennwerten: peak_amplitude, median_bandpower, crosscorr_with_clockevents. + """ + path = Path(trace_file) + if not path.exists(): + raise FileNotFoundError(f"Trace file not found: {trace_file}") + + # Laden der Daten je nach Format + if path.suffix.lower() == '.csv': + data = pd.read_csv(trace_file) + elif path.suffix.lower() == '.json': + with open(trace_file, 'r', encoding='utf-8') as f: + json_data = json.load(f) + data = pd.DataFrame(json_data) + else: + raise ValueError("Unsupported file format. Use CSV or JSON.") + + # Eingabevalidierung + if 'signal' not in data.columns or 'timestamp' not in data.columns: + raise ValueError("Input data must contain 'signal' and 'timestamp' columns.") + + # Berechnung der Kennwerte + signal = data['signal'].astype(float) + + peak_amplitude = float(np.max(np.abs(signal))) + median_bandpower = float(np.median(signal ** 2)) + + # Kreuzkorrelation zwischen Signal und Timestamps + ts = data['timestamp'].astype(float) + if len(signal) != len(ts) or len(signal) < 2: + crosscorr_with_clockevents = 0.0 + else: + signal_std = np.std(signal) + ts_std = np.std(ts) + if signal_std == 0 or ts_std == 0: + crosscorr_with_clockevents = 0.0 + else: + corr_matrix = np.corrcoef(signal, ts) + crosscorr_with_clockevents = float(corr_matrix[0, 1]) + + return { + "peak_amplitude": peak_amplitude, + "median_bandpower": median_bandpower, + "crosscorr_with_clockevents": crosscorr_with_clockevents, + }