Add audio_visualizer/src/audio_visualizer/io_utils.py

This commit is contained in:
Mika 2026-02-08 03:06:31 +00:00
parent 6f45028af0
commit 05488822ee

View file

@ -0,0 +1,83 @@
from __future__ import annotations
import csv
import logging
from pathlib import Path
from typing import NamedTuple
import numpy as np
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
class SensorDataRow(NamedTuple):
"""Repräsentiert eine einzelne Zeile von Sensordaten."""
timestamp: float
frequency: float
amplitude: float
class AudioDataLoadError(Exception):
"""Wird ausgelöst, wenn beim Laden der Audio- oder Sensordaten ein Fehler auftritt."""
def load_audio_data(path: str) -> np.ndarray:
"""Lädt und parst Sensordaten oder Audiodateien zur Weiterverarbeitung.
Unterstützt aktuell CSV-Dateien mit Spalten: timestamp, frequency, amplitude.
Args:
path (str): Pfad zur Eingabedatei (CSV oder WAV).
Returns:
np.ndarray: Ein Array mit den geladenen Daten (timestamp, frequency, amplitude).
Raises:
AudioDataLoadError: Wenn die Datei nicht existiert oder unlesbar ist.
ValueError: Wenn Daten ungültig oder unvollständig sind.
"""
file_path = Path(path)
if not file_path.exists():
logger.error(f"Datei nicht gefunden: {path}")
raise AudioDataLoadError(f"Datei nicht gefunden: {path}")
if file_path.suffix.lower() != '.csv':
logger.error(f"Nur CSV-Dateien werden derzeit unterstützt: {path}")
raise AudioDataLoadError(f"Nur CSV-Dateien werden derzeit unterstützt: {path}")
data_rows: list[SensorDataRow] = []
with file_path.open('r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
required_fields = {'timestamp', 'frequency', 'amplitude'}
if not required_fields.issubset(reader.fieldnames or {}):
logger.error(f"CSV-Datei enthält nicht alle erforderlichen Felder: {reader.fieldnames}")
raise ValueError("Ungültiges CSV-Format: timestamp, frequency, amplitude erforderlich.")
for row in reader:
try:
timestamp = float(row['timestamp'])
frequency = float(row['frequency'])
amplitude = float(row['amplitude'])
data_rows.append(SensorDataRow(timestamp, frequency, amplitude))
except (KeyError, ValueError) as e:
logger.warning(f"Überspringe fehlerhafte Zeile ({row}): {e}")
if not data_rows:
logger.error("Keine gültigen Datenzeilen gefunden.")
raise ValueError("Keine gültigen Sensordaten enthalten.")
data_array = np.array(data_rows, dtype=float)
logger.info(f"Geladene Datensätze: {len(data_array)} aus {path}")
# Zusätzliche Validierung
assert data_array.shape[1] == 3, "Jede Zeile muss drei Werte enthalten: timestamp, frequency, amplitude"
return data_array