Add data_analysis/src/data_analysis/core.py

This commit is contained in:
Mika 2026-02-22 03:07:04 +00:00
parent f436a8d19f
commit c98bd3bdda

View file

@ -0,0 +1,102 @@
import json
import argparse
import logging
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Dict, Any
class DataValidationError(Exception):
"""Custom exception for invalid input data."""
pass
def _validate_data_points(data: List[Dict[str, Any]], required_fields=("intensity", "background_noise")) -> None:
if not isinstance(data, list):
raise DataValidationError("Input data must be a list of dictionaries.")
for i, item in enumerate(data):
if not isinstance(item, dict):
raise DataValidationError(f"Data item at index {i} is not a dictionary.")
for field in required_fields:
if field not in item:
raise DataValidationError(f"Missing field '{field}' in data item at index {i}.")
if not isinstance(item[field], (int, float)):
raise DataValidationError(f"Field '{field}' in item {i} must be numeric.")
def correct_flourescence(raw_data: List[Dict[str, float]]) -> List[Dict[str, float]]:
"""Korrigiert Roh-Fluoreszenzdaten durch Glättung und Basislinienabzug."""
_validate_data_points(raw_data)
df = pd.DataFrame(raw_data)
# Glättung (moving average filter)
df['intensity'] = df['intensity'].rolling(window=3, min_periods=1, center=True).mean()
# Basislinienabzug anhand des Medianwerts
baseline = df['background_noise'].median()
df['corrected_intensity'] = np.maximum(df['intensity'] - baseline, 0)
corrected = [
{
'intensity': float(row['corrected_intensity']),
'background_noise': float(row['background_noise'])
}
for _, row in df.iterrows()
]
return corrected
def substract_reference(data: List[Dict[str, float]], reference: List[Dict[str, float]]) -> List[Dict[str, float]]:
"""Subtrahiert Referenzwerte (z.B. Fremdlicht) aus den Daten."""
_validate_data_points(data)
_validate_data_points(reference)
df_data = pd.DataFrame(data)
df_ref = pd.DataFrame(reference)
# Falls Längen unterschiedlich, mit Minimalgröße arbeiten
min_len = min(len(df_data), len(df_ref))
df_data = df_data.iloc[:min_len].reset_index(drop=True)
df_ref = df_ref.iloc[:min_len].reset_index(drop=True)
df_result = df_data.copy()
df_result['intensity'] = np.maximum(df_data['intensity'] - df_ref['intensity'], 0)
return df_result.to_dict(orient='records')
def _load_json(path: Path) -> List[Dict[str, Any]]:
with path.open('r', encoding='utf-8') as f:
return json.load(f)
def _save_json(data: List[Dict[str, Any]], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
def main():
parser = argparse.ArgumentParser(description="Fluoreszenzdaten-Korrektur")
parser.add_argument('--input', required=True, help='Pfad zur Eingabedatei mit Rohdaten (JSON).')
parser.add_argument('--reference', required=True, help='Pfad zur Referenzdatei (JSON).')
parser.add_argument('--output', required=True, help='Pfad zur Ausgabedatei (JSON).')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
in_path = Path(args.input)
ref_path = Path(args.reference)
out_path = Path(args.output)
logger.info(f"Lade Rohdaten aus {in_path}...")
raw_data = _load_json(in_path)
logger.info(f"Lade Referenzdaten aus {ref_path}...")
reference_data = _load_json(ref_path)
logger.info("Korrigiere Fluoreszenzdaten...")
corrected = correct_flourescence(raw_data)
logger.info("Subtrahiere Referenzsignal...")
result = substract_reference(corrected, reference_data)
_save_json(result, out_path)
logger.info(f"Korrigierte Daten wurden nach {out_path} geschrieben.")
if __name__ == '__main__':
main()