oszi.py 14 KB


  1. import customtkinter as ctk
  2. import time
  3. import math
  4. import random
  5. import csv
  6. import tkinter as tk
  7. import customtkinter as ctk
  8. import time
  9. import serial
  10. import time
  11. ## cc maximilian scheinast-peter
  12. ## last update 11.04.2024
  13. ## dieses programm funktioniert ähnlich wie ein digitales Oszi zur Darstellung von Vitalkurven
  14. ## Attribution-NonCommercial license
  15. def get_data():
  16. # Configure serial port (adjust port name as needed)
  17. ser = serial.Serial('/dev/ttyACM0', 115200)
  18. # Wait for the serial connection to establish
  19. ser.timeout = 2
  20. #ser.write(b'testcom.py\n') # Send command to start the script
  21. while True:
  22. # Read data from serial port
  23. data = ser.readline().decode().strip()
  24. # Check if data is not empty
  25. #print(data)
  26. if data:
  27. try:
  28. # Attempt to convert data to float
  29. value = float(data)
  30. timestamp = time.time()
  31. return timestamp, value
  32. except ValueError:
  33. print("Invalid data format:", data)
  34. else:
  35. print("Empty data received")
  36. # Add a small delay to prevent rapid looping
  37. time.sleep(0.001)
  38. def wait_for_next_millisecond():
  39. """Waits until the next full millisecond."""
  40. current_time = time.time()
  41. next_millisecond = (int(current_time * 1000) + 1) / 1000
  42. time.sleep(next_millisecond - current_time)
  43. # Function to generate realistic ECG data
  44. class EKGApp:
  45. def __init__(self, master):
  46. self.master = master
  47. master.title("EKG Visualization")
  48. # Main Frame
  49. main_frame = ctk.CTkFrame(master)
  50. main_frame.pack(fill=ctk.BOTH, expand=True)
  51. # Single Sweep Canvas
  52. self.single_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  53. self.single_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  54. # Multi Sweep Canvas
  55. self.multi_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  56. self.multi_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  57. # Input Frame
  58. input_frame = ctk.CTkFrame(main_frame)
  59. input_frame.pack(side=ctk.RIGHT, fill=ctk.Y)
  60. # Data Structures
  61. self.single_data = []
  62. self.multi_data = []
  63. self.logged_data = []
  64. # Time Variables
  65. self.start_time = time.time()
  66. self.multi_sweep_duration = 10 # Initial Laufbanddauer
  67. # Trigger Variables
  68. self.trigger_level = 50
  69. self.trigger_armed = False
  70. self.trigger_paused = False
  71. self.trigger_timestamp = None
  72. self.last_trigger_timestamp = None
  73. self.last_value = 0
  74. self.trigger_count = 0
  75. # Cooldown Variables
  76. self.cooldown_time = 0
  77. self.cooldown_active = False
  78. # Logging Variables
  79. self.logging_active = False
  80. # Batching Variables
  81. self.batch_size = 5
  82. self.current_batch = []
  83. # Input Fields and Buttons (with consistent size)
  84. self.create_input_fields(input_frame)
  85. # Logging Frame
  86. logging_frame = ctk.CTkFrame(master)
  87. logging_frame.pack(fill=ctk.X)
  88. self.create_logging_buttons(logging_frame)
  89. # Update Data Periodically
  90. self.update_data()
  91. # Resize Handling
  92. self.single_canvas.bind("<Configure>", self.on_resize)
  93. self.multi_canvas.bind("<Configure>", self.on_resize)
  94. def create_input_fields(self, frame):
  95. # Consistent width for all input elements
  96. input_width = 150
  97. # Trigger Level
  98. ctk.CTkLabel(frame, text="Trigger Level:").pack()
  99. self.trigger_entry = ctk.CTkEntry(frame, width=input_width)
  100. self.trigger_entry.insert(0, str(self.trigger_level))
  101. self.trigger_entry.pack()
  102. # Flank Selection
  103. ctk.CTkLabel(frame, text="Flanke:").pack()
  104. self.flank_var = ctk.StringVar(value="Steigende Flanke")
  105. self.flank_combobox = ctk.CTkComboBox(frame, variable=self.flank_var,
  106. values=["Steigende Flanke", "Fallende Flanke"], width=input_width)
  107. self.flank_combobox.pack()
  108. # Cooldown
  109. ctk.CTkLabel(frame, text="Cooldown (Sekunden):").pack()
  110. self.cooldown_entry = ctk.CTkEntry(frame, width=input_width)
  111. self.cooldown_entry.insert(0, "0")
  112. self.cooldown_entry.pack()
  113. self.cooldown_button = ctk.CTkButton(frame, text="Cooldown setzen", command=self.set_cooldown,
  114. width=input_width)
  115. self.cooldown_button.pack()
  116. # Ax Time (X-Achsen-Zeit)
  117. ctk.CTkLabel(frame, text="Ax-Zeit (Sekunden):").pack()
  118. self.ax_time_entry = ctk.CTkEntry(frame, width=input_width)
  119. self.ax_time_entry.insert(0, str(self.multi_sweep_duration)) # Initial value
  120. self.ax_time_entry.pack()
  121. self.ax_time_button = ctk.CTkButton(frame, text="Ax-Zeit setzen", command=self.set_ax_time, width=input_width)
  122. self.ax_time_button.pack()
  123. # Trigger Buttons
  124. self.trigger_button = ctk.CTkButton(frame, text="Trigger starten", command=self.toggle_trigger,
  125. width=input_width)
  126. self.trigger_button.pack()
  127. self.pause_button = ctk.CTkButton(frame, text="Trigger pausieren", command=self.pause_trigger,
  128. state=ctk.DISABLED, width=input_width)
  129. self.pause_button.pack()
  130. # Multi Sweep Duration (Laufbanddauer)
  131. ctk.CTkLabel(frame, text="Laufbanddauer (Sekunden):").pack()
  132. self.duration_entry = ctk.CTkEntry(frame, width=input_width)
  133. self.duration_entry.insert(0, "30")
  134. self.duration_entry.pack()
  135. self.duration_button = ctk.CTkButton(frame, text="Dauer setzen", command=self.set_duration, width=input_width)
  136. self.duration_button.pack()
  137. def set_ax_time(self):
  138. try:
  139. self.multi_sweep_duration = float(self.ax_time_entry.get())
  140. except ValueError:
  141. pass
  142. def create_logging_buttons(self, frame):
  143. # Consistent width for logging buttons
  144. button_width = 120
  145. self.start_logging_button = ctk.CTkButton(frame, text="Start Logging", command=self.start_logging,
  146. width=button_width)
  147. self.start_logging_button.pack(side=ctk.LEFT)
  148. self.end_logging_button = ctk.CTkButton(frame, text="End Logging", command=self.end_logging, state=ctk.DISABLED,
  149. width=button_width)
  150. self.end_logging_button.pack(side=ctk.LEFT)
  151. self.save_data_button = ctk.CTkButton(frame, text="Save Data", command=self.save_data, state=ctk.DISABLED,
  152. width=button_width)
  153. self.save_data_button.pack(side=ctk.LEFT)
  154. def set_cooldown(self):
  155. try:
  156. self.cooldown_time = float(self.cooldown_entry.get())
  157. except ValueError:
  158. pass
  159. def set_duration(self):
  160. try:
  161. self.multi_sweep_duration = float(self.duration_entry.get())
  162. except ValueError:
  163. pass
  164. def toggle_trigger(self):
  165. if self.trigger_armed:
  166. self.trigger_armed = False
  167. self.trigger_button.configure(text="Trigger starten")
  168. self.pause_button.configure(state=ctk.DISABLED)
  169. self.single_data = []
  170. self.draw_single_canvas()
  171. else:
  172. try:
  173. self.trigger_level = float(self.trigger_entry.get())
  174. except ValueError:
  175. pass
  176. self.trigger_armed = True
  177. self.trigger_button.configure(text="Trigger stoppen")
  178. self.pause_button.configure(state=ctk.NORMAL)
  179. self.trigger_count = 0
  180. def pause_trigger(self):
  181. self.trigger_paused = not self.trigger_paused
  182. if self.trigger_paused:
  183. self.pause_button.configure(text="Trigger fortsetzen")
  184. else:
  185. self.pause_button.configure(text="Trigger pausieren")
  186. def start_logging(self):
  187. self.logging_active = True
  188. self.logged_data = []
  189. self.start_logging_button.configure(state=ctk.DISABLED)
  190. self.end_logging_button.configure(state=ctk.NORMAL)
  191. def end_logging(self):
  192. self.logging_active = False
  193. self.start_logging_button.configure(state=ctk.NORMAL)
  194. self.end_logging_button.configure(state=ctk.DISABLED)
  195. self.save_data_button.configure(state=ctk.NORMAL)
  196. def save_data(self):
  197. # Popup for file name
  198. ####print(123)
  199. file_name = ctk.CTkInputDialog(text="Enter file name:", title="Save Data").get_input()
  200. if file_name:
  201. try:
  202. with open(f"{file_name}.csv", "w", newline="", encoding='utf-8') as csvfile:
  203. writer = csv.writer(csvfile)
  204. writer.writerow(["Timestamp", "Value"])
  205. writer.writerows(self.logged_data)
  206. self.save_data_button.configure(state=ctk.DISABLED)
  207. except Exception as e:
  208. print(f"Error saving data: {e}")
  209. def update_data(self):
  210. # Get EKG Data
  211. timestamp, value = get_data()
  212. # Collect data points into batches
  213. self.current_batch.append((timestamp, value))
  214. if len(self.current_batch) == self.batch_size:
  215. # Process the batch (e.g., plot it)
  216. #self.multi_data.append((timestamp, value))
  217. self.process_batch(self.current_batch)
  218. # Reset the current batch
  219. self.current_batch = []
  220. # Update Multi Sweep Data
  221. #self.multi_data.append((timestamp, value))
  222. try:
  223. if timestamp - self.multi_data[0][0] > self.multi_sweep_duration:
  224. self.multi_data.pop(0)
  225. except:
  226. pass
  227. # Trigger Logic
  228. if self.trigger_armed and not self.trigger_paused and not self.cooldown_active:
  229. condition = (
  230. value >= self.trigger_level and self.last_value < self.trigger_level) if self.flank_var.get() == "Steigende Flanke" else (
  231. value <= self.trigger_level and self.last_value > self.trigger_level)
  232. if condition:
  233. self.last_trigger_timestamp = timestamp
  234. self.single_data = []
  235. self.trigger_count = 0
  236. if self.cooldown_time > 0:
  237. self.cooldown_active = True
  238. self.master.after(int(self.cooldown_time * 1000), self.end_cooldown)
  239. # Update Single Sweep Data
  240. if self.last_trigger_timestamp is not None and self.trigger_armed and not self.trigger_paused:
  241. self.single_data.append((timestamp, value))
  242. # Update Logged Data
  243. if self.logging_active:
  244. self.logged_data.append((timestamp, value))
  245. # Store last value and update trigger count
  246. self.last_value = value
  247. if self.trigger_armed and not self.trigger_paused:
  248. self.trigger_count += 1
  249. # Call again after 1ms
  250. self.master.after(1,self.update_data)
  251. def end_cooldown(self):
  252. self.cooldown_active = False
  253. def process_batch(self, batch):
  254. global i
  255. i=i+1
  256. #print(i)
  257. # Example: Calculate average value for the batch
  258. total_value = sum(value for _, value in batch)
  259. average_value = total_value / len(batch)
  260. for timestamp, value in batch:
  261. self.multi_data.append((timestamp, value))
  262. ###print(f"Average value for batch: {average_value}")
  263. self.draw_multi_canvas()
  264. # Draw Canvases
  265. self.draw_single_canvas()
  266. # You can implement your own logic here, such as plotting the batch
  267. # on a separate canvas or performing other calculations.
  268. def draw_single_canvas(self):
  269. self.single_canvas.delete("all")
  270. if not self.single_data or self.last_trigger_timestamp is None:
  271. return
  272. # Prepare coordinates for create_line
  273. coords = []
  274. x_scale = self.single_canvas.winfo_width() / self.multi_sweep_duration
  275. y_scale = self.single_canvas.winfo_height() / 100
  276. last_x, last_y = None, None
  277. for timestamp, value in self.single_data:
  278. x = (timestamp - self.last_trigger_timestamp) * x_scale
  279. y = self.single_canvas.winfo_height() - (value * self.single_canvas.winfo_height() / 100)
  280. if last_x is not None:
  281. coords.extend([last_x, last_y, x, y])
  282. last_x, last_y = x, y
  283. # Draw the lines
  284. if coords:
  285. self.single_canvas.create_line(*coords, fill="blue")
  286. def draw_multi_canvas(self):
  287. self.multi_canvas.delete("all")
  288. if not self.multi_data:
  289. return
  290. duration = max(self.multi_data[-1][0] - self.multi_data[0][0], 0.001)
  291. x_scale = self.multi_canvas.winfo_width() / duration
  292. y_scale = self.multi_canvas.winfo_height() / 100
  293. coords = []
  294. for timestamp, value in self.multi_data:
  295. x = (timestamp - self.multi_data[0][0]) * x_scale
  296. y = self.multi_canvas.winfo_height() - (value * y_scale)
  297. coords.extend([x, y])
  298. self.multi_canvas.create_line(*coords, fill="blue")
  299. trigger_y = self.multi_canvas.winfo_height() - (self.trigger_level * y_scale)
  300. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y + 5, fill="red")
  301. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y - 5, fill="red")
  302. def draw_time_axis(self, canvas, start_time, duration):
  303. for i in range(int(duration) + 1):
  304. x = i * canvas.winfo_width() / duration
  305. canvas.create_line(x, 0, x, canvas.winfo_height(), fill="gray", dash=(2, 2))
  306. canvas.create_text(x, canvas.winfo_height() - 10, text=f"{i:.1f}s", anchor=ctk.N)
  307. def on_resize(self, event):
  308. self.draw_single_canvas()
  309. self.draw_multi_canvas()
  310. i=0
  311. if __name__ == "__main__":
  312. #ctk.set_appearance_mode("dark") # Modes: "System" (standard), "Dark", "Light"
  313. ctk.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
  314. root = ctk.CTk()
  315. app = EKGApp(root)
  316. root.mainloop()