Add data_analysis/src/data_analysis/core.py
This commit is contained in:
parent
101ec5ec25
commit
986b0c9ede
1 changed files with 91 additions and 0 deletions
91
data_analysis/src/data_analysis/core.py
Normal file
91
data_analysis/src/data_analysis/core.py
Normal file
|
|
@ -0,0 +1,91 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass, asdict
|
||||||
|
from typing import List, Union
|
||||||
|
import numpy as np
|
||||||
|
from scipy.signal import find_peaks
|
||||||
|
import librosa
|
||||||
|
|
||||||
|
|
||||||
|
# Logging setup for CI readiness
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
if not logger.handlers:
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
formatter = logging.Formatter('[%(asctime)s] %(levelname)s in %(name)s: %(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
|
||||||
|
|
||||||
|
class PatternAnalysisError(Exception):
|
||||||
|
"""Custom exception for errors during audio pattern analysis."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PatternReport:
|
||||||
|
"""Repräsentiert das Ergebnis einer Mustererkennung in den Audiodaten."""
|
||||||
|
pattern_name: str
|
||||||
|
frequency: float
|
||||||
|
correlation: float
|
||||||
|
comments: str
|
||||||
|
|
||||||
|
def to_json(self) -> str:
|
||||||
|
return json.dumps(asdict(self), ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
|
||||||
|
# Helper validation
|
||||||
|
def _validate_audio_data(audio_data: Union[np.ndarray, list[float]]) -> np.ndarray:
|
||||||
|
if isinstance(audio_data, list):
|
||||||
|
audio_data = np.array(audio_data, dtype=float)
|
||||||
|
if not isinstance(audio_data, np.ndarray):
|
||||||
|
raise TypeError("audio_data must be numpy.ndarray or list[float]")
|
||||||
|
if audio_data.size == 0:
|
||||||
|
raise ValueError("audio_data cannot be empty")
|
||||||
|
if not np.isfinite(audio_data).all():
|
||||||
|
raise ValueError("audio_data contains non-finite values")
|
||||||
|
return audio_data
|
||||||
|
|
||||||
|
|
||||||
|
def analyze_patterns(audio_data: Union[np.ndarray, list[float]]) -> List[PatternReport]:
|
||||||
|
"""Analysiert Audiodaten, extrahiert Frequenzmuster und berechnet Korrelationen zwischen erkannten Mustern."""
|
||||||
|
logger.info("Starting audio pattern analysis.")
|
||||||
|
try:
|
||||||
|
data = _validate_audio_data(audio_data)
|
||||||
|
sr = 22050 # sampling rate assumption
|
||||||
|
|
||||||
|
# Compute FFT magnitude spectrum
|
||||||
|
fft_mag = np.abs(np.fft.rfft(data))
|
||||||
|
freqs = np.fft.rfftfreq(len(data), 1 / sr)
|
||||||
|
|
||||||
|
# Find peaks in frequency domain
|
||||||
|
peaks, _ = find_peaks(fft_mag, height=np.mean(fft_mag) * 1.5)
|
||||||
|
if len(peaks) == 0:
|
||||||
|
logger.warning("No prominent peaks detected in audio data.")
|
||||||
|
|
||||||
|
pattern_reports: List[PatternReport] = []
|
||||||
|
for i, peak_idx in enumerate(peaks[:10]): # limit to top 10 patterns
|
||||||
|
freq = freqs[peak_idx]
|
||||||
|
|
||||||
|
# Simulate a simple correlation heuristic: relative intensity
|
||||||
|
correlation = float(min(1.0, fft_mag[peak_idx] / (np.max(fft_mag) + 1e-9)))
|
||||||
|
pattern_name = f"Pattern_{i+1}"
|
||||||
|
comments = f"Detected frequency peak at {freq:.1f} Hz."
|
||||||
|
|
||||||
|
report = PatternReport(
|
||||||
|
pattern_name=pattern_name,
|
||||||
|
frequency=float(freq),
|
||||||
|
correlation=correlation,
|
||||||
|
comments=comments,
|
||||||
|
)
|
||||||
|
pattern_reports.append(report)
|
||||||
|
|
||||||
|
logger.info(f"Analysis completed: {len(pattern_reports)} patterns detected.")
|
||||||
|
assert all(isinstance(r, PatternReport) for r in pattern_reports), "Return integrity check failed."
|
||||||
|
return pattern_reports
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"Error during pattern analysis: {exc}")
|
||||||
|
raise PatternAnalysisError(str(exc)) from exc
|
||||||
Loading…
Reference in a new issue