Add audio_signal_analysis/src/audio_signal_analysis/core.py

This commit is contained in:
Mika 2026-06-21 02:07:25 +00:00
parent 895de7ce3e
commit ed72bf7054

View file

@ -0,0 +1,65 @@
import logging
from typing import Dict, Any
import numpy as np
from scipy import signal
import pandas as pd
import math
def analyze_audio_signal(audio_data: Dict[str, Any]) -> float:
"""Analyse der Audiozeitreihe und Berechnung der Korrelation mit Temperaturdaten.
Erwartet ein Dictionary mit Schlüsseln 'audio' und 'temperature', die jeweils
Zeitreihen mit Zeitstempeln und Messwerten enthalten.
Args:
audio_data: Ein Dictionary mit Audiound Temperaturdaten.
Returns:
float: Korrelationskoeffizient zwischen Audio- und Temperaturdaten.
"""
logger = logging.getLogger(__name__)
if not isinstance(audio_data, dict):
raise TypeError("audio_data muss ein Dictionary sein.")
required_keys = {"audio", "temperature"}
if not required_keys.issubset(audio_data.keys()):
raise ValueError(f"audio_data muss die Schlüssel {required_keys} enthalten.")
audio_df = pd.DataFrame(audio_data["audio"])
temp_df = pd.DataFrame(audio_data["temperature"])
if not {"timestamp", "audio_level"}.issubset(audio_df.columns):
raise ValueError("Fehlende Spalten in audio_data['audio'].")
if not {"timestamp", "temperature_c"}.issubset(temp_df.columns):
raise ValueError("Fehlende Spalten in audio_data['temperature'].")
# Konvertiere Zeitstempel und sortiere nach Zeit
audio_df['timestamp'] = pd.to_datetime(audio_df['timestamp'])
temp_df['timestamp'] = pd.to_datetime(temp_df['timestamp'])
audio_df.sort_values('timestamp', inplace=True)
temp_df.sort_values('timestamp', inplace=True)
# Interpolation auf gemeinsame Zeitbasis
merged = pd.merge_asof(audio_df, temp_df, on='timestamp', direction='nearest')
audio_series = merged['audio_level'].to_numpy(dtype=float)
temp_series = merged['temperature_c'].to_numpy(dtype=float)
if len(audio_series) < 2 or len(temp_series) < 2:
raise ValueError("Zu wenige Datenpunkte für Korrelationsberechnung.")
# Normalisierung
audio_series = (audio_series - np.mean(audio_series)) / np.std(audio_series)
temp_series = (temp_series - np.mean(temp_series)) / np.std(temp_series)
# Korrelation über FFT-Kreuzkorrelation
corr = signal.correlate(audio_series, temp_series, mode='valid')
corr_norm = corr / np.sqrt(np.sum(audio_series**2) * np.sum(temp_series**2))
correlation_coefficient = float(np.max(corr_norm))
# Sicherheit: Grenze auf [-1, 1]
correlation_coefficient = max(-1.0, min(1.0, correlation_coefficient))
logging.debug("Berechneter Korrelationskoeffizient: %s", correlation_coefficient)
assert math.isfinite(correlation_coefficient), "Korrelationskoeffizient ist nicht endlich"
return correlation_coefficient