optimized_ecg.py 15 KB

  1. import customtkinter as ctk
  2. import time
  3. import math
  4. import random
  5. import csv
  6. import tkinter as tk
  7. import customtkinter as ctk
  8. import time
  9. ## cc maximilian scheinast-peter
  10. ## last update 11.04.2024
  11. ## dieses programm funktioniert ähnlich wie ein digitales Oszi zur Darstellung von Vitalkurven
  12. ## Attribution-NonCommercial license
  13. def get_data1():
  14. import serial
  15. import time
  16. # Configure serial port (adjust port name as needed)
  17. ser = serial.Serial('COM10', 9600)
  18. # Wait for the serial connection to establish
  19. ser.timeout = 2
  20. #ser.write(b'testcom.py\n') # Send command to start the script
  21. while True:
  22. # Read data from serial port
  23. data = ser.readline().decode().strip()
  24. # Check if data is not empty
  25. #print(data)
  26. if data:
  27. try:
  28. # Attempt to convert data to float
  29. value = float(data)
  30. timestamp = time.time()
  31. return timestamp, value
  32. except ValueError:
  33. print("Invalid data format:", data)
  34. else:
  35. print("Empty data received")
  36. # Add a small delay to prevent rapid looping
  37. time.sleep(0.001)
  38. def get_data(): # example heartfunction
  39. global i
  40. i=i+1
  41. ###print(i)
  42. timestamp = time.time() ## dont change
  43. ## logic for get one datapoint like a read_value function
  44. t = (timestamp % 1) * 2 * math.pi
  45. p_wave = 5 * math.sin(t)
  46. qrs_complex = 40 * math.sin(1.5 * t) * math.exp(-0.25 * t ** 2)
  47. t_wave = 10 * math.sin(2 * t) * math.exp(-0.5 * t ** 2)
  48. value = 50 + p_wave + qrs_complex + t_wave
  49. value = max(0, min(value, 100))
  50. wait_for_next_millisecond()
  51. return timestamp, value
  52. def wait_for_next_millisecond():
  53. """Waits until the next full millisecond."""
  54. current_time = time.time()
  55. next_millisecond = (int(current_time * 1000) + 1) / 1000
  56. time.sleep(next_millisecond - current_time)
  57. # Function to generate realistic ECG data
  58. class EKGApp:
  59. def __init__(self, master):
  60. self.master = master
  61. master.title("EKG Visualization")
  62. # Main Frame
  63. main_frame = ctk.CTkFrame(master)
  64. main_frame.pack(fill=ctk.BOTH, expand=True)
  65. # Single Sweep Canvas
  66. self.single_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  67. self.single_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  68. # Multi Sweep Canvas
  69. self.multi_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  70. self.multi_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  71. # Input Frame
  72. input_frame = ctk.CTkFrame(main_frame)
  73. input_frame.pack(side=ctk.RIGHT, fill=ctk.Y)
  74. # Data Structures
  75. self.single_data = []
  76. self.multi_data = []
  77. self.logged_data = []
  78. # Time Variables
  79. self.start_time = time.time()
  80. self.multi_sweep_duration = 10 # Initial Laufbanddauer
  81. # Trigger Variables
  82. self.trigger_level = 50
  83. self.trigger_armed = False
  84. self.trigger_paused = False
  85. self.trigger_timestamp = None
  86. self.last_trigger_timestamp = None
  87. self.last_value = 0
  88. self.trigger_count = 0
  89. # Cooldown Variables
  90. self.cooldown_time = 0
  91. self.cooldown_active = False
  92. # Logging Variables
  93. self.logging_active = False
  94. # Batching Variables
  95. self.batch_size = 5
  96. self.current_batch = []
  97. # Input Fields and Buttons (with consistent size)
  98. self.create_input_fields(input_frame)
  99. # Logging Frame
  100. logging_frame = ctk.CTkFrame(master)
  101. logging_frame.pack(fill=ctk.X)
  102. self.create_logging_buttons(logging_frame)
  103. # Update Data Periodically
  104. self.update_data()
  105. # Resize Handling
  106. self.single_canvas.bind("<Configure>", self.on_resize)
  107. self.multi_canvas.bind("<Configure>", self.on_resize)
  108. def create_input_fields(self, frame):
  109. # Consistent width for all input elements
  110. input_width = 150
  111. # Trigger Level
  112. ctk.CTkLabel(frame, text="Trigger Level:").pack()
  113. self.trigger_entry = ctk.CTkEntry(frame, width=input_width)
  114. self.trigger_entry.insert(0, str(self.trigger_level))
  115. self.trigger_entry.pack()
  116. # Flank Selection
  117. ctk.CTkLabel(frame, text="Flanke:").pack()
  118. self.flank_var = ctk.StringVar(value="Steigende Flanke")
  119. self.flank_combobox = ctk.CTkComboBox(frame, variable=self.flank_var,
  120. values=["Steigende Flanke", "Fallende Flanke"], width=input_width)
  121. self.flank_combobox.pack()
  122. # Cooldown
  123. ctk.CTkLabel(frame, text="Cooldown (Sekunden):").pack()
  124. self.cooldown_entry = ctk.CTkEntry(frame, width=input_width)
  125. self.cooldown_entry.insert(0, "0")
  126. self.cooldown_entry.pack()
  127. self.cooldown_button = ctk.CTkButton(frame, text="Cooldown setzen", command=self.set_cooldown,
  128. width=input_width)
  129. self.cooldown_button.pack()
  130. # Ax Time (X-Achsen-Zeit)
  131. ctk.CTkLabel(frame, text="Ax-Zeit (Sekunden):").pack()
  132. self.ax_time_entry = ctk.CTkEntry(frame, width=input_width)
  133. self.ax_time_entry.insert(0, str(self.multi_sweep_duration)) # Initial value
  134. self.ax_time_entry.pack()
  135. self.ax_time_button = ctk.CTkButton(frame, text="Ax-Zeit setzen", command=self.set_ax_time, width=input_width)
  136. self.ax_time_button.pack()
  137. # Trigger Buttons
  138. self.trigger_button = ctk.CTkButton(frame, text="Trigger starten", command=self.toggle_trigger,
  139. width=input_width)
  140. self.trigger_button.pack()
  141. self.pause_button = ctk.CTkButton(frame, text="Trigger pausieren", command=self.pause_trigger,
  142. state=ctk.DISABLED, width=input_width)
  143. self.pause_button.pack()
  144. # Multi Sweep Duration (Laufbanddauer)
  145. ctk.CTkLabel(frame, text="Laufbanddauer (Sekunden):").pack()
  146. self.duration_entry = ctk.CTkEntry(frame, width=input_width)
  147. self.duration_entry.insert(0, "30")
  148. self.duration_entry.pack()
  149. self.duration_button = ctk.CTkButton(frame, text="Dauer setzen", command=self.set_duration, width=input_width)
  150. self.duration_button.pack()
  151. def set_ax_time(self):
  152. try:
  153. self.multi_sweep_duration = float(self.ax_time_entry.get())
  154. except ValueError:
  155. pass
  156. def create_logging_buttons(self, frame):
  157. # Consistent width for logging buttons
  158. button_width = 120
  159. self.start_logging_button = ctk.CTkButton(frame, text="Start Logging", command=self.start_logging,
  160. width=button_width)
  161. self.start_logging_button.pack(side=ctk.LEFT)
  162. self.end_logging_button = ctk.CTkButton(frame, text="End Logging", command=self.end_logging, state=ctk.DISABLED,
  163. width=button_width)
  164. self.end_logging_button.pack(side=ctk.LEFT)
  165. self.save_data_button = ctk.CTkButton(frame, text="Save Data", command=self.save_data, state=ctk.DISABLED,
  166. width=button_width)
  167. self.save_data_button.pack(side=ctk.LEFT)
  168. def set_cooldown(self):
  169. try:
  170. self.cooldown_time = float(self.cooldown_entry.get())
  171. except ValueError:
  172. pass
  173. def set_duration(self):
  174. try:
  175. self.multi_sweep_duration = float(self.duration_entry.get())
  176. except ValueError:
  177. pass
  178. def toggle_trigger(self):
  179. if self.trigger_armed:
  180. self.trigger_armed = False
  181. self.trigger_button.configure(text="Trigger starten")
  182. self.pause_button.configure(state=ctk.DISABLED)
  183. self.single_data = []
  184. self.draw_single_canvas()
  185. else:
  186. try:
  187. self.trigger_level = float(self.trigger_entry.get())
  188. except ValueError:
  189. pass
  190. self.trigger_armed = True
  191. self.trigger_button.configure(text="Trigger stoppen")
  192. self.pause_button.configure(state=ctk.NORMAL)
  193. self.trigger_count = 0
  194. def pause_trigger(self):
  195. self.trigger_paused = not self.trigger_paused
  196. if self.trigger_paused:
  197. self.pause_button.configure(text="Trigger fortsetzen")
  198. else:
  199. self.pause_button.configure(text="Trigger pausieren")
  200. def start_logging(self):
  201. self.logging_active = True
  202. self.logged_data = []
  203. self.start_logging_button.configure(state=ctk.DISABLED)
  204. self.end_logging_button.configure(state=ctk.NORMAL)
  205. def end_logging(self):
  206. self.logging_active = False
  207. self.start_logging_button.configure(state=ctk.NORMAL)
  208. self.end_logging_button.configure(state=ctk.DISABLED)
  209. self.save_data_button.configure(state=ctk.NORMAL)
  210. def save_data(self):
  211. # Popup for file name
  212. ####print(123)
  213. file_name = ctk.CTkInputDialog(text="Enter file name:", title="Save Data").get_input()
  214. if file_name:
  215. try:
  216. with open(f"{file_name}.csv", "w", newline="", encoding='utf-8') as csvfile:
  217. writer = csv.writer(csvfile)
  218. writer.writerow(["Timestamp", "Value"])
  219. writer.writerows(self.logged_data)
  220. self.save_data_button.configure(state=ctk.DISABLED)
  221. except Exception as e:
  222. print(f"Error saving data: {e}")
  223. def update_data(self):
  224. # Get EKG Data
  225. timestamp, value = get_data()
  226. # Collect data points into batches
  227. self.current_batch.append((timestamp, value))
  228. if len(self.current_batch) == self.batch_size:
  229. # Process the batch (e.g., plot it)
  230. #self.multi_data.append((timestamp, value))
  231. self.process_batch(self.current_batch)
  232. # Reset the current batch
  233. self.current_batch = []
  234. # Update Multi Sweep Data
  235. #self.multi_data.append((timestamp, value))
  236. try:
  237. if timestamp - self.multi_data[0][0] > self.multi_sweep_duration:
  238. self.multi_data.pop(0)
  239. except:
  240. pass
  241. # Trigger Logic
  242. if self.trigger_armed and not self.trigger_paused and not self.cooldown_active:
  243. condition = (
  244. value >= self.trigger_level and self.last_value < self.trigger_level) if self.flank_var.get() == "Steigende Flanke" else (
  245. value <= self.trigger_level and self.last_value > self.trigger_level)
  246. if condition:
  247. self.last_trigger_timestamp = timestamp
  248. self.single_data = []
  249. self.trigger_count = 0
  250. if self.cooldown_time > 0:
  251. self.cooldown_active = True
  252. self.master.after(int(self.cooldown_time * 1000), self.end_cooldown)
  253. # Update Single Sweep Data
  254. if self.last_trigger_timestamp is not None and self.trigger_armed and not self.trigger_paused:
  255. self.single_data.append((timestamp, value))
  256. # Update Logged Data
  257. if self.logging_active:
  258. self.logged_data.append((timestamp, value))
  259. # Store last value and update trigger count
  260. self.last_value = value
  261. if self.trigger_armed and not self.trigger_paused:
  262. self.trigger_count += 1
  263. # Call again after 1ms
  264. self.master.after(1,self.update_data)
  265. def end_cooldown(self):
  266. self.cooldown_active = False
  267. def process_batch(self, batch):
  268. global i
  269. i=i+1
  270. #print(i)
  271. # Example: Calculate average value for the batch
  272. total_value = sum(value for _, value in batch)
  273. average_value = total_value / len(batch)
  274. for timestamp, value in batch:
  275. self.multi_data.append((timestamp, value))
  276. ###print(f"Average value for batch: {average_value}")
  277. self.draw_multi_canvas()
  278. # Draw Canvases
  279. self.draw_single_canvas()
  280. # You can implement your own logic here, such as plotting the batch
  281. # on a separate canvas or performing other calculations.
  282. def draw_single_canvas(self):
  283. self.single_canvas.delete("all")
  284. if not self.single_data or self.last_trigger_timestamp is None:
  285. return
  286. # Prepare coordinates for create_line
  287. coords = []
  288. x_scale = self.single_canvas.winfo_width() / self.multi_sweep_duration
  289. y_scale = self.single_canvas.winfo_height() / 100
  290. last_x, last_y = None, None
  291. for timestamp, value in self.single_data:
  292. x = (timestamp - self.last_trigger_timestamp) * x_scale
  293. y = self.single_canvas.winfo_height() - (value * self.single_canvas.winfo_height() / 100)
  294. if last_x is not None:
  295. coords.extend([last_x, last_y, x, y])
  296. last_x, last_y = x, y
  297. # Draw the lines
  298. if coords:
  299. self.single_canvas.create_line(*coords, fill="blue")
  300. def draw_multi_canvas(self):
  301. self.multi_canvas.delete("all")
  302. if not self.multi_data:
  303. return
  304. duration = max(self.multi_data[-1][0] - self.multi_data[0][0], 0.001)
  305. x_scale = self.multi_canvas.winfo_width() / duration
  306. y_scale = self.multi_canvas.winfo_height() / 100
  307. coords = []
  308. for timestamp, value in self.multi_data:
  309. x = (timestamp - self.multi_data[0][0]) * x_scale
  310. y = self.multi_canvas.winfo_height() - (value * y_scale)
  311. coords.extend([x, y])
  312. self.multi_canvas.create_line(*coords, fill="blue")
  313. trigger_y = self.multi_canvas.winfo_height() - (self.trigger_level * y_scale)
  314. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y + 5, fill="red")
  315. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y - 5, fill="red")
  316. def draw_time_axis(self, canvas, start_time, duration):
  317. for i in range(int(duration) + 1):
  318. x = i * canvas.winfo_width() / duration
  319. canvas.create_line(x, 0, x, canvas.winfo_height(), fill="gray", dash=(2, 2))
  320. canvas.create_text(x, canvas.winfo_height() - 10, text=f"{i:.1f}s", anchor=ctk.N)
  321. def on_resize(self, event):
  322. self.draw_single_canvas()
  323. self.draw_multi_canvas()
  324. i=0
  325. if __name__ == "__main__":
  326. ctk.set_appearance_mode("dark") # Modes: "System" (standard), "Dark", "Light"
  327. ctk.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
  328. root = ctk.CTk()
  329. app = EKGApp(root)
  330. root.mainloop()