Add geiger_counter_visualizer/src/geiger_counter_visualizer/core.py
This commit is contained in:
commit
5e0b6cb049
1 changed files with 107 additions and 0 deletions
107
geiger_counter_visualizer/src/geiger_counter_visualizer/core.py
Normal file
107
geiger_counter_visualizer/src/geiger_counter_visualizer/core.py
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
import time
|
||||
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.animation import FuncAnimation
|
||||
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="[%(asctime)s] %(levelname)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataValidationError(ValueError):
|
||||
"""Custom exception for invalid DataPoint input."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class DataPoint:
|
||||
"""Repräsentiert ein einzelnes Messereignis eines Geigerzählers."""
|
||||
|
||||
timestamp: str
|
||||
count: int
|
||||
frequency: float
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not isinstance(self.timestamp, str):
|
||||
raise DataValidationError("timestamp must be a string in ISO format")
|
||||
try:
|
||||
datetime.fromisoformat(self.timestamp)
|
||||
except ValueError as e:
|
||||
raise DataValidationError(f"Invalid timestamp format: {self.timestamp}") from e
|
||||
|
||||
if not isinstance(self.count, int) or self.count < 0:
|
||||
raise DataValidationError("count must be a non-negative integer")
|
||||
if not isinstance(self.frequency, (int, float)) or self.frequency < 0:
|
||||
raise DataValidationError("frequency must be a non-negative number")
|
||||
|
||||
|
||||
# Public API function
|
||||
def visualize_data(data: List[DataPoint]) -> None:
|
||||
"""Visualisiert eine Sequenz von Geigerzählerdaten in Echtzeit."""
|
||||
|
||||
if not isinstance(data, list) or not all(isinstance(d, DataPoint) for d in data):
|
||||
raise DataValidationError("Input data must be a list of DataPoint instances.")
|
||||
|
||||
logger.info(f"Starting visualization with {len(data)} data points.")
|
||||
|
||||
df = pd.DataFrame([{
|
||||
'timestamp': datetime.fromisoformat(d.timestamp),
|
||||
'count': d.count,
|
||||
'frequency': d.frequency
|
||||
} for d in data])
|
||||
|
||||
fig, ax1 = plt.subplots()
|
||||
plt.title("Geiger Counter Data Visualization")
|
||||
|
||||
ax1.set_xlabel("Time")
|
||||
ax1.set_ylabel("Counts", color='tab:blue')
|
||||
line1, = ax1.plot_date([], [], '-', color='tab:blue', label='Counts')
|
||||
|
||||
ax2 = ax1.twinx()
|
||||
ax2.set_ylabel("Frequency (Hz)", color='tab:red')
|
||||
line2, = ax2.plot_date([], [], '-', color='tab:red', label='Frequency')
|
||||
|
||||
plt.legend(loc='upper left')
|
||||
|
||||
def init():
|
||||
line1.set_data([], [])
|
||||
line2.set_data([], [])
|
||||
return line1, line2
|
||||
|
||||
def update(frame_idx):
|
||||
sub_df = df.iloc[:frame_idx + 1]
|
||||
line1.set_data(sub_df['timestamp'], sub_df['count'])
|
||||
line2.set_data(sub_df['timestamp'], sub_df['frequency'])
|
||||
ax1.relim()
|
||||
ax1.autoscale_view()
|
||||
ax2.relim()
|
||||
ax2.autoscale_view()
|
||||
logger.debug(f"Frame {frame_idx+1}/{len(df)} updated.")
|
||||
return line1, line2
|
||||
|
||||
ani = FuncAnimation(
|
||||
fig,
|
||||
update,
|
||||
frames=len(df),
|
||||
init_func=init,
|
||||
interval=500,
|
||||
blit=False,
|
||||
repeat=False,
|
||||
)
|
||||
|
||||
plt.tight_layout()
|
||||
plt.show(block=True)
|
||||
|
||||
logger.info("Visualization completed successfully.")
|
||||
return None
|
||||
Loading…
Reference in a new issue