optimized_ecg.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. import customtkinter as ctk
  2. import time
  3. import math
  4. import random
  5. import csv
  6. import tkinter as tk
  7. import customtkinter as ctk
  8. import time
  9. ## cc maximilian scheinast-peter
  10. ## last update 11.04.2024
  11. ## dieses programm funktioniert ähnlich wie ein digitales Oszi zur Darstellung von Vitalkurven
  12. ## Attribution-NonCommercial license
  13. def get_data1():
  14. import serial
  15. import time
  16. # Configure serial port (adjust port name as needed)
  17. ser = serial.Serial('COM10', 9600)
  18. # Wait for the serial connection to establish
  19. ser.timeout = 2
  20. #ser.write(b'testcom.py\n') # Send command to start the script
  21. while True:
  22. # Read data from serial port
  23. data = ser.readline().decode().strip()
  24. # Check if data is not empty
  25. #print(data)
  26. if data:
  27. try:
  28. # Attempt to convert data to float
  29. value = float(data)
  30. timestamp = time.time()
  31. return timestamp, value
  32. except ValueError:
  33. print("Invalid data format:", data)
  34. else:
  35. print("Empty data received")
  36. # Add a small delay to prevent rapid looping
  37. time.sleep(0.001)
  38. def get_data(): # example heartfunction
  39. global i
  40. i=i+1
  41. ###print(i)
  42. timestamp = time.time() ## dont change
  43. ## logic for get one datapoint like a read_value function
  44. t = (timestamp % 1) * 2 * math.pi
  45. p_wave = 5 * math.sin(t)
  46. qrs_complex = 40 * math.sin(1.5 * t) * math.exp(-0.25 * t ** 2)
  47. t_wave = 10 * math.sin(2 * t) * math.exp(-0.5 * t ** 2)
  48. value = 50 + p_wave + qrs_complex + t_wave
  49. value = max(0, min(value, 100))
  50. wait_for_next_millisecond()
  51. return timestamp, value
  52. def wait_for_next_millisecond():
  53. """Waits until the next full millisecond."""
  54. current_time = time.time()
  55. next_millisecond = (int(current_time * 1000) + 1) / 1000
  56. time.sleep(next_millisecond - current_time)
  57. # Function to generate realistic ECG data
  58. class EKGApp:
  59. def __init__(self, master):
  60. self.master = master
  61. master.title("EKG Visualization")
  62. # Main Frame
  63. main_frame = ctk.CTkFrame(master)
  64. main_frame.pack(fill=ctk.BOTH, expand=True)
  65. # Single Sweep Canvas
  66. self.single_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  67. self.single_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  68. # Multi Sweep Canvas
  69. self.multi_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  70. self.multi_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  71. # Input Frame
  72. input_frame = ctk.CTkFrame(main_frame)
  73. input_frame.pack(side=ctk.RIGHT, fill=ctk.Y)
  74. # Data Structures
  75. self.single_data = []
  76. self.multi_data = []
  77. self.logged_data = []
  78. # Time Variables
  79. self.start_time = time.time()
  80. self.multi_sweep_duration = 10 # Initial Laufbanddauer
  81. # Trigger Variables
  82. self.trigger_level = 50
  83. self.trigger_armed = False
  84. self.trigger_paused = False
  85. self.trigger_timestamp = None
  86. self.last_trigger_timestamp = None
  87. self.last_value = 0
  88. self.trigger_count = 0
  89. # Cooldown Variables
  90. self.cooldown_time = 0
  91. self.cooldown_active = False
  92. # Logging Variables
  93. self.logging_active = False
  94. # Batching Variables
  95. self.batch_size = 5
  96. self.current_batch = []
  97. # Input Fields and Buttons (with consistent size)
  98. self.create_input_fields(input_frame)
  99. # Logging Frame
  100. logging_frame = ctk.CTkFrame(master)
  101. logging_frame.pack(fill=ctk.X)
  102. self.create_logging_buttons(logging_frame)
  103. # Update Data Periodically
  104. self.update_data()
  105. # Resize Handling
  106. self.single_canvas.bind("<Configure>", self.on_resize)
  107. self.multi_canvas.bind("<Configure>", self.on_resize)
  108. def create_input_fields(self, frame):
  109. # Consistent width for all input elements
  110. input_width = 150
  111. # Trigger Level
  112. ctk.CTkLabel(frame, text="Trigger Level:").pack()
  113. self.trigger_entry = ctk.CTkEntry(frame, width=input_width)
  114. self.trigger_entry.insert(0, str(self.trigger_level))
  115. self.trigger_entry.pack()
  116. # Flank Selection
  117. ctk.CTkLabel(frame, text="Flanke:").pack()
  118. self.flank_var = ctk.StringVar(value="Steigende Flanke")
  119. self.flank_combobox = ctk.CTkComboBox(frame, variable=self.flank_var,
  120. values=["Steigende Flanke", "Fallende Flanke"], width=input_width)
  121. self.flank_combobox.pack()
  122. # Cooldown
  123. ctk.CTkLabel(frame, text="Cooldown (Sekunden):").pack()
  124. self.cooldown_entry = ctk.CTkEntry(frame, width=input_width)
  125. self.cooldown_entry.insert(0, "0")
  126. self.cooldown_entry.pack()
  127. self.cooldown_button = ctk.CTkButton(frame, text="Cooldown setzen", command=self.set_cooldown,
  128. width=input_width)
  129. self.cooldown_button.pack()
  130. # Ax Time (X-Achsen-Zeit)
  131. ctk.CTkLabel(frame, text="Ax-Zeit (Sekunden):").pack()
  132. self.ax_time_entry = ctk.CTkEntry(frame, width=input_width)
  133. self.ax_time_entry.insert(0, str(self.multi_sweep_duration)) # Initial value
  134. self.ax_time_entry.pack()
  135. self.ax_time_button = ctk.CTkButton(frame, text="Ax-Zeit setzen", command=self.set_ax_time, width=input_width)
  136. self.ax_time_button.pack()
  137. # Trigger Buttons
  138. self.trigger_button = ctk.CTkButton(frame, text="Trigger starten", command=self.toggle_trigger,
  139. width=input_width)
  140. self.trigger_button.pack()
  141. self.pause_button = ctk.CTkButton(frame, text="Trigger pausieren", command=self.pause_trigger,
  142. state=ctk.DISABLED, width=input_width)
  143. self.pause_button.pack()
  144. # Multi Sweep Duration (Laufbanddauer)
  145. ctk.CTkLabel(frame, text="Laufbanddauer (Sekunden):").pack()
  146. self.duration_entry = ctk.CTkEntry(frame, width=input_width)
  147. self.duration_entry.insert(0, "30")
  148. self.duration_entry.pack()
  149. self.duration_button = ctk.CTkButton(frame, text="Dauer setzen", command=self.set_duration, width=input_width)
  150. self.duration_button.pack()
  151. def set_ax_time(self):
  152. try:
  153. self.multi_sweep_duration = float(self.ax_time_entry.get())
  154. except ValueError:
  155. pass
  156. def create_logging_buttons(self, frame):
  157. # Consistent width for logging buttons
  158. button_width = 120
  159. self.start_logging_button = ctk.CTkButton(frame, text="Start Logging", command=self.start_logging,
  160. width=button_width)
  161. self.start_logging_button.pack(side=ctk.LEFT)
  162. self.end_logging_button = ctk.CTkButton(frame, text="End Logging", command=self.end_logging, state=ctk.DISABLED,
  163. width=button_width)
  164. self.end_logging_button.pack(side=ctk.LEFT)
  165. self.save_data_button = ctk.CTkButton(frame, text="Save Data", command=self.save_data, state=ctk.DISABLED,
  166. width=button_width)
  167. self.save_data_button.pack(side=ctk.LEFT)
  168. def set_cooldown(self):
  169. try:
  170. self.cooldown_time = float(self.cooldown_entry.get())
  171. except ValueError:
  172. pass
  173. def set_duration(self):
  174. try:
  175. self.multi_sweep_duration = float(self.duration_entry.get())
  176. except ValueError:
  177. pass
  178. def toggle_trigger(self):
  179. if self.trigger_armed:
  180. self.trigger_armed = False
  181. self.trigger_button.configure(text="Trigger starten")
  182. self.pause_button.configure(state=ctk.DISABLED)
  183. self.single_data = []
  184. self.draw_single_canvas()
  185. else:
  186. try:
  187. self.trigger_level = float(self.trigger_entry.get())
  188. except ValueError:
  189. pass
  190. self.trigger_armed = True
  191. self.trigger_button.configure(text="Trigger stoppen")
  192. self.pause_button.configure(state=ctk.NORMAL)
  193. self.trigger_count = 0
  194. def pause_trigger(self):
  195. self.trigger_paused = not self.trigger_paused
  196. if self.trigger_paused:
  197. self.pause_button.configure(text="Trigger fortsetzen")
  198. else:
  199. self.pause_button.configure(text="Trigger pausieren")
  200. def start_logging(self):
  201. self.logging_active = True
  202. self.logged_data = []
  203. self.start_logging_button.configure(state=ctk.DISABLED)
  204. self.end_logging_button.configure(state=ctk.NORMAL)
  205. def end_logging(self):
  206. self.logging_active = False
  207. self.start_logging_button.configure(state=ctk.NORMAL)
  208. self.end_logging_button.configure(state=ctk.DISABLED)
  209. self.save_data_button.configure(state=ctk.NORMAL)
  210. def save_data(self):
  211. # Popup for file name
  212. ####print(123)
  213. file_name = ctk.CTkInputDialog(text="Enter file name:", title="Save Data").get_input()
  214. if file_name:
  215. try:
  216. with open(f"{file_name}.csv", "w", newline="", encoding='utf-8') as csvfile:
  217. writer = csv.writer(csvfile)
  218. writer.writerow(["Timestamp", "Value"])
  219. writer.writerows(self.logged_data)
  220. self.save_data_button.configure(state=ctk.DISABLED)
  221. except Exception as e:
  222. print(f"Error saving data: {e}")
  223. def update_data(self):
  224. # Get EKG Data
  225. timestamp, value = get_data()
  226. # Collect data points into batches
  227. self.current_batch.append((timestamp, value))
  228. if len(self.current_batch) == self.batch_size:
  229. # Process the batch (e.g., plot it)
  230. #self.multi_data.append((timestamp, value))
  231. self.process_batch(self.current_batch)
  232. # Reset the current batch
  233. self.current_batch = []
  234. # Update Multi Sweep Data
  235. #self.multi_data.append((timestamp, value))
  236. try:
  237. if timestamp - self.multi_data[0][0] > self.multi_sweep_duration:
  238. self.multi_data.pop(0)
  239. except:
  240. pass
  241. # Trigger Logic
  242. if self.trigger_armed and not self.trigger_paused and not self.cooldown_active:
  243. condition = (
  244. value >= self.trigger_level and self.last_value < self.trigger_level) if self.flank_var.get() == "Steigende Flanke" else (
  245. value <= self.trigger_level and self.last_value > self.trigger_level)
  246. if condition:
  247. self.last_trigger_timestamp = timestamp
  248. self.single_data = []
  249. self.trigger_count = 0
  250. if self.cooldown_time > 0:
  251. self.cooldown_active = True
  252. self.master.after(int(self.cooldown_time * 1000), self.end_cooldown)
  253. # Update Single Sweep Data
  254. if self.last_trigger_timestamp is not None and self.trigger_armed and not self.trigger_paused:
  255. self.single_data.append((timestamp, value))
  256. # Update Logged Data
  257. if self.logging_active:
  258. self.logged_data.append((timestamp, value))
  259. # Store last value and update trigger count
  260. self.last_value = value
  261. if self.trigger_armed and not self.trigger_paused:
  262. self.trigger_count += 1
  263. # Call again after 1ms
  264. self.master.after(1,self.update_data)
  265. def end_cooldown(self):
  266. self.cooldown_active = False
  267. def process_batch(self, batch):
  268. global i
  269. i=i+1
  270. #print(i)
  271. # Example: Calculate average value for the batch
  272. total_value = sum(value for _, value in batch)
  273. average_value = total_value / len(batch)
  274. for timestamp, value in batch:
  275. self.multi_data.append((timestamp, value))
  276. ###print(f"Average value for batch: {average_value}")
  277. self.draw_multi_canvas()
  278. # Draw Canvases
  279. self.draw_single_canvas()
  280. # You can implement your own logic here, such as plotting the batch
  281. # on a separate canvas or performing other calculations.
  282. def draw_single_canvas(self):
  283. self.single_canvas.delete("all")
  284. if not self.single_data or self.last_trigger_timestamp is None:
  285. return
  286. # Prepare coordinates for create_line
  287. coords = []
  288. x_scale = self.single_canvas.winfo_width() / self.multi_sweep_duration
  289. y_scale = self.single_canvas.winfo_height() / 100
  290. last_x, last_y = None, None
  291. for timestamp, value in self.single_data:
  292. x = (timestamp - self.last_trigger_timestamp) * x_scale
  293. y = self.single_canvas.winfo_height() - (value * self.single_canvas.winfo_height() / 100)
  294. if last_x is not None:
  295. coords.extend([last_x, last_y, x, y])
  296. last_x, last_y = x, y
  297. # Draw the lines
  298. if coords:
  299. self.single_canvas.create_line(*coords, fill="blue")
  300. def draw_multi_canvas(self):
  301. self.multi_canvas.delete("all")
  302. if not self.multi_data:
  303. return
  304. duration = max(self.multi_data[-1][0] - self.multi_data[0][0], 0.001)
  305. x_scale = self.multi_canvas.winfo_width() / duration
  306. y_scale = self.multi_canvas.winfo_height() / 100
  307. coords = []
  308. for timestamp, value in self.multi_data:
  309. x = (timestamp - self.multi_data[0][0]) * x_scale
  310. y = self.multi_canvas.winfo_height() - (value * y_scale)
  311. coords.extend([x, y])
  312. self.multi_canvas.create_line(*coords, fill="blue")
  313. trigger_y = self.multi_canvas.winfo_height() - (self.trigger_level * y_scale)
  314. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y + 5, fill="red")
  315. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y - 5, fill="red")
  316. def draw_time_axis(self, canvas, start_time, duration):
  317. for i in range(int(duration) + 1):
  318. x = i * canvas.winfo_width() / duration
  319. canvas.create_line(x, 0, x, canvas.winfo_height(), fill="gray", dash=(2, 2))
  320. canvas.create_text(x, canvas.winfo_height() - 10, text=f"{i:.1f}s", anchor=ctk.N)
  321. def on_resize(self, event):
  322. self.draw_single_canvas()
  323. self.draw_multi_canvas()
  324. i=0
  325. if __name__ == "__main__":
  326. ctk.set_appearance_mode("dark") # Modes: "System" (standard), "Dark", "Light"
  327. ctk.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
  328. root = ctk.CTk()
  329. app = EKGApp(root)
  330. root.mainloop()