public_oszi_beta.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. import customtkinter as ctk
  2. import time
  3. import math
  4. import random
  5. import csv
  6. import tkinter as tk
  7. import customtkinter as ctk
  8. import time
  9. ## cc maximilian scheinast-peter
  10. ## last update 11.04.2024
  11. ## dieses programm funktioniert ähnlich wie ein digitales Oszi zur Darstellung von Vitalkurven
  12. ## Attribution-NonCommercial license
  13. def get_data(): # example heartfunction
  14. global i
  15. i=i+1
  16. ###print(i)
  17. timestamp = time.time() ## dont change
  18. ## logic for get one datapoint like a read_value function
  19. t = (timestamp % 1) * 2 * math.pi
  20. p_wave = 5 * math.sin(t)
  21. qrs_complex = 40 * math.sin(1.5 * t) * math.exp(-0.25 * t ** 2)
  22. t_wave = 10 * math.sin(2 * t) * math.exp(-0.5 * t ** 2)
  23. value = 50 + p_wave + qrs_complex + t_wave
  24. value = max(0, min(value, 100))
  25. wait_for_next_millisecond()
  26. return timestamp, value
  27. def wait_for_next_millisecond():
  28. """Waits until the next full millisecond."""
  29. current_time = time.time()
  30. next_millisecond = (int(current_time * 1000) + 1) / 1000
  31. time.sleep(next_millisecond - current_time)
  32. # Function to generate realistic ECG data
  33. class EKGApp:
  34. def __init__(self, master):
  35. self.master = master
  36. master.title("EKG Visualization")
  37. # Main Frame
  38. main_frame = ctk.CTkFrame(master)
  39. main_frame.pack(fill=ctk.BOTH, expand=True)
  40. # Single Sweep Canvas
  41. self.single_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  42. self.single_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  43. # Multi Sweep Canvas
  44. self.multi_canvas = ctk.CTkCanvas(main_frame, width=600, height=300)
  45. self.multi_canvas.pack(side=ctk.LEFT, fill=ctk.BOTH, expand=True)
  46. # Input Frame
  47. input_frame = ctk.CTkFrame(main_frame)
  48. input_frame.pack(side=ctk.RIGHT, fill=ctk.Y)
  49. # Data Structures
  50. self.single_data = []
  51. self.multi_data = []
  52. self.logged_data = []
  53. # Time Variables
  54. self.start_time = time.time()
  55. self.multi_sweep_duration = 10 # Initial Laufbanddauer
  56. # Trigger Variables
  57. self.trigger_level = 50
  58. self.trigger_armed = False
  59. self.trigger_paused = False
  60. self.trigger_timestamp = None
  61. self.last_trigger_timestamp = None
  62. self.last_value = 0
  63. self.trigger_count = 0
  64. # Cooldown Variables
  65. self.cooldown_time = 0
  66. self.cooldown_active = False
  67. # Logging Variables
  68. self.logging_active = False
  69. # Batching Variables
  70. self.batch_size = 5
  71. self.current_batch = []
  72. # Input Fields and Buttons (with consistent size)
  73. self.create_input_fields(input_frame)
  74. # Logging Frame
  75. logging_frame = ctk.CTkFrame(master)
  76. logging_frame.pack(fill=ctk.X)
  77. self.create_logging_buttons(logging_frame)
  78. # Update Data Periodically
  79. self.update_data()
  80. # Resize Handling
  81. self.single_canvas.bind("<Configure>", self.on_resize)
  82. self.multi_canvas.bind("<Configure>", self.on_resize)
  83. def create_input_fields(self, frame):
  84. # Consistent width for all input elements
  85. input_width = 150
  86. # Trigger Level
  87. ctk.CTkLabel(frame, text="Trigger Level:").pack()
  88. self.trigger_entry = ctk.CTkEntry(frame, width=input_width)
  89. self.trigger_entry.insert(0, str(self.trigger_level))
  90. self.trigger_entry.pack()
  91. # Flank Selection
  92. ctk.CTkLabel(frame, text="Flanke:").pack()
  93. self.flank_var = ctk.StringVar(value="Steigende Flanke")
  94. self.flank_combobox = ctk.CTkComboBox(frame, variable=self.flank_var,
  95. values=["Steigende Flanke", "Fallende Flanke"], width=input_width)
  96. self.flank_combobox.pack()
  97. # Cooldown
  98. ctk.CTkLabel(frame, text="Cooldown (Sekunden):").pack()
  99. self.cooldown_entry = ctk.CTkEntry(frame, width=input_width)
  100. self.cooldown_entry.insert(0, "0")
  101. self.cooldown_entry.pack()
  102. self.cooldown_button = ctk.CTkButton(frame, text="Cooldown setzen", command=self.set_cooldown,
  103. width=input_width)
  104. self.cooldown_button.pack()
  105. # Ax Time (X-Achsen-Zeit)
  106. ctk.CTkLabel(frame, text="Ax-Zeit (Sekunden):").pack()
  107. self.ax_time_entry = ctk.CTkEntry(frame, width=input_width)
  108. self.ax_time_entry.insert(0, str(self.multi_sweep_duration)) # Initial value
  109. self.ax_time_entry.pack()
  110. self.ax_time_button = ctk.CTkButton(frame, text="Ax-Zeit setzen", command=self.set_ax_time, width=input_width)
  111. self.ax_time_button.pack()
  112. # Trigger Buttons
  113. self.trigger_button = ctk.CTkButton(frame, text="Trigger starten", command=self.toggle_trigger,
  114. width=input_width)
  115. self.trigger_button.pack()
  116. self.pause_button = ctk.CTkButton(frame, text="Trigger pausieren", command=self.pause_trigger,
  117. state=ctk.DISABLED, width=input_width)
  118. self.pause_button.pack()
  119. # Multi Sweep Duration (Laufbanddauer)
  120. ctk.CTkLabel(frame, text="Laufbanddauer (Sekunden):").pack()
  121. self.duration_entry = ctk.CTkEntry(frame, width=input_width)
  122. self.duration_entry.insert(0, "30")
  123. self.duration_entry.pack()
  124. self.duration_button = ctk.CTkButton(frame, text="Dauer setzen", command=self.set_duration, width=input_width)
  125. self.duration_button.pack()
  126. def set_ax_time(self):
  127. try:
  128. self.multi_sweep_duration = float(self.ax_time_entry.get())
  129. except ValueError:
  130. pass
  131. def create_logging_buttons(self, frame):
  132. # Consistent width for logging buttons
  133. button_width = 120
  134. self.start_logging_button = ctk.CTkButton(frame, text="Start Logging", command=self.start_logging,
  135. width=button_width)
  136. self.start_logging_button.pack(side=ctk.LEFT)
  137. self.end_logging_button = ctk.CTkButton(frame, text="End Logging", command=self.end_logging, state=ctk.DISABLED,
  138. width=button_width)
  139. self.end_logging_button.pack(side=ctk.LEFT)
  140. self.save_data_button = ctk.CTkButton(frame, text="Save Data", command=self.save_data, state=ctk.DISABLED,
  141. width=button_width)
  142. self.save_data_button.pack(side=ctk.LEFT)
  143. def set_cooldown(self):
  144. try:
  145. self.cooldown_time = float(self.cooldown_entry.get())
  146. except ValueError:
  147. pass
  148. def set_duration(self):
  149. try:
  150. self.multi_sweep_duration = float(self.duration_entry.get())
  151. except ValueError:
  152. pass
  153. def toggle_trigger(self):
  154. if self.trigger_armed:
  155. self.trigger_armed = False
  156. self.trigger_button.configure(text="Trigger starten")
  157. self.pause_button.configure(state=ctk.DISABLED)
  158. self.single_data = []
  159. self.draw_single_canvas()
  160. else:
  161. try:
  162. self.trigger_level = float(self.trigger_entry.get())
  163. except ValueError:
  164. pass
  165. self.trigger_armed = True
  166. self.trigger_button.configure(text="Trigger stoppen")
  167. self.pause_button.configure(state=ctk.NORMAL)
  168. self.trigger_count = 0
  169. def pause_trigger(self):
  170. self.trigger_paused = not self.trigger_paused
  171. if self.trigger_paused:
  172. self.pause_button.configure(text="Trigger fortsetzen")
  173. else:
  174. self.pause_button.configure(text="Trigger pausieren")
  175. def start_logging(self):
  176. self.logging_active = True
  177. self.logged_data = []
  178. self.start_logging_button.configure(state=ctk.DISABLED)
  179. self.end_logging_button.configure(state=ctk.NORMAL)
  180. def end_logging(self):
  181. self.logging_active = False
  182. self.start_logging_button.configure(state=ctk.NORMAL)
  183. self.end_logging_button.configure(state=ctk.DISABLED)
  184. self.save_data_button.configure(state=ctk.NORMAL)
  185. def save_data(self):
  186. # Popup for file name
  187. ####print(123)
  188. file_name = ctk.CTkInputDialog(text="Enter file name:", title="Save Data").get_input()
  189. if file_name:
  190. try:
  191. with open(f"{file_name}.csv", "w", newline="", encoding='utf-8') as csvfile:
  192. writer = csv.writer(csvfile)
  193. writer.writerow(["Timestamp", "Value"])
  194. writer.writerows(self.logged_data)
  195. self.save_data_button.configure(state=ctk.DISABLED)
  196. except Exception as e:
  197. print(f"Error saving data: {e}")
  198. def update_data(self):
  199. # Get EKG Data
  200. timestamp, value = get_data()
  201. # Collect data points into batches
  202. self.current_batch.append((timestamp, value))
  203. if len(self.current_batch) == self.batch_size:
  204. # Process the batch (e.g., plot it)
  205. #self.multi_data.append((timestamp, value))
  206. self.process_batch(self.current_batch)
  207. # Reset the current batch
  208. self.current_batch = []
  209. # Update Multi Sweep Data
  210. #self.multi_data.append((timestamp, value))
  211. try:
  212. if timestamp - self.multi_data[0][0] > self.multi_sweep_duration:
  213. self.multi_data.pop(0)
  214. except:
  215. pass
  216. # Trigger Logic
  217. if self.trigger_armed and not self.trigger_paused and not self.cooldown_active:
  218. condition = (
  219. value >= self.trigger_level and self.last_value < self.trigger_level) if self.flank_var.get() == "Steigende Flanke" else (
  220. value <= self.trigger_level and self.last_value > self.trigger_level)
  221. if condition:
  222. self.last_trigger_timestamp = timestamp
  223. self.single_data = []
  224. self.trigger_count = 0
  225. if self.cooldown_time > 0:
  226. self.cooldown_active = True
  227. self.master.after(int(self.cooldown_time * 1000), self.end_cooldown)
  228. # Update Single Sweep Data
  229. if self.last_trigger_timestamp is not None and self.trigger_armed and not self.trigger_paused:
  230. self.single_data.append((timestamp, value))
  231. # Update Logged Data
  232. if self.logging_active:
  233. self.logged_data.append((timestamp, value))
  234. # Store last value and update trigger count
  235. self.last_value = value
  236. if self.trigger_armed and not self.trigger_paused:
  237. self.trigger_count += 1
  238. # Call again after 1ms
  239. self.master.after(1,self.update_data)
  240. def end_cooldown(self):
  241. self.cooldown_active = False
  242. def process_batch(self, batch):
  243. global i
  244. i=i+1
  245. #print(i)
  246. # Example: Calculate average value for the batch
  247. total_value = sum(value for _, value in batch)
  248. average_value = total_value / len(batch)
  249. for timestamp, value in batch:
  250. self.multi_data.append((timestamp, value))
  251. ###print(f"Average value for batch: {average_value}")
  252. self.draw_multi_canvas()
  253. # Draw Canvases
  254. self.draw_single_canvas()
  255. # You can implement your own logic here, such as plotting the batch
  256. # on a separate canvas or performing other calculations.
  257. def draw_single_canvas(self):
  258. self.single_canvas.delete("all")
  259. if not self.single_data or self.last_trigger_timestamp is None:
  260. return
  261. # Prepare coordinates for create_line
  262. coords = []
  263. x_scale = self.single_canvas.winfo_width() / self.multi_sweep_duration
  264. y_scale = self.single_canvas.winfo_height() / 100
  265. last_x, last_y = None, None
  266. for timestamp, value in self.single_data:
  267. x = (timestamp - self.last_trigger_timestamp) * x_scale
  268. y = self.single_canvas.winfo_height() - (value * self.single_canvas.winfo_height() / 100)
  269. if last_x is not None:
  270. coords.extend([last_x, last_y, x, y])
  271. last_x, last_y = x, y
  272. # Draw the lines
  273. if coords:
  274. self.single_canvas.create_line(*coords, fill="blue")
  275. def draw_multi_canvas(self):
  276. self.multi_canvas.delete("all")
  277. if not self.multi_data:
  278. return
  279. duration = max(self.multi_data[-1][0] - self.multi_data[0][0], 0.001)
  280. x_scale = self.multi_canvas.winfo_width() / duration
  281. y_scale = self.multi_canvas.winfo_height() / 100
  282. coords = []
  283. for timestamp, value in self.multi_data:
  284. x = (timestamp - self.multi_data[0][0]) * x_scale
  285. y = self.multi_canvas.winfo_height() - (value * y_scale)
  286. coords.extend([x, y])
  287. self.multi_canvas.create_line(*coords, fill="blue")
  288. trigger_y = self.multi_canvas.winfo_height() - (self.trigger_level * y_scale)
  289. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y + 5, fill="red")
  290. self.multi_canvas.create_line(self.multi_canvas.winfo_width() - 10, trigger_y, self.multi_canvas.winfo_width(), trigger_y - 5, fill="red")
  291. def draw_time_axis(self, canvas, start_time, duration):
  292. for i in range(int(duration) + 1):
  293. x = i * canvas.winfo_width() / duration
  294. canvas.create_line(x, 0, x, canvas.winfo_height(), fill="gray", dash=(2, 2))
  295. canvas.create_text(x, canvas.winfo_height() - 10, text=f"{i:.1f}s", anchor=ctk.N)
  296. def on_resize(self, event):
  297. self.draw_single_canvas()
  298. self.draw_multi_canvas()
  299. i=0
  300. if __name__ == "__main__":
  301. ctk.set_appearance_mode("dark") # Modes: "System" (standard), "Dark", "Light"
  302. ctk.set_default_color_theme("blue") # Themes: "blue" (standard), "green", "dark-blue"
  303. root = ctk.CTk()
  304. app = EKGApp(root)
  305. root.mainloop()