opticalSensor.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. #!/bin/python3
  2. import time
  3. import math
  4. import threading
  5. import traceback
  6. import queue
  7. import numpy as np
  8. import cv2
  9. import cv2.aruco as aruco
  10. if __name__ != "__main__":
  11. from sensors.calibration import CalibrationStateMashine
  12. import logHandler
  13. aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_50)
  14. def saveMarkers():
  15. image = np.zeros((400,400), np.uint8)
  16. image[:,:] = (255)
  17. image[ 50: 50 + 100, 50: 50 + 100] = aruco_dict.drawMarker(0, 100)
  18. image[ 50: 50 + 100,-50 - 100:-50] = aruco_dict.drawMarker(1, 100)
  19. cv2.imwrite("markers.png", image)
  20. parameters = aruco.DetectorParameters_create()
  21. def find_marker(image, debug=True):
  22. # Our operations on the frame come here
  23. gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  24. if debug:
  25. image[:,:,0] = gray
  26. image[:,:,1] = gray
  27. image[:,:,2] = gray
  28. corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
  29. markers = [None] * 4
  30. if not corners:
  31. return markers
  32. for corner, id in zip(corners, ids.flatten()):
  33. # draw the bounding box of the ArUCo detection
  34. c = corner[0]
  35. cX = sum(c[:,0]) / 4
  36. cY = sum(c[:,1]) / 4
  37. if debug:
  38. cv2.line(image, tuple(c[0]), tuple(c[1]), (0, 255, 0), 2)
  39. cv2.line(image, tuple(c[1]), tuple(c[2]), (0, 255, 0), 2)
  40. cv2.line(image, tuple(c[2]), tuple(c[3]), (0, 255, 0), 2)
  41. cv2.line(image, tuple(c[3]), tuple(c[0]), (0, 255, 0), 2)
  42. cv2.putText(image, str(id), (int(c[0][0]), int(c[0][1]) - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
  43. cv2.circle(image, (int(cX), int(cY)), 4, (0, 0, 255), -1)
  44. if id < 4:
  45. markers[id] = (cX, cY)
  46. return markers
  47. def measureDistances(image, debug=True):
  48. markers = find_marker(image, debug)
  49. if markers[0] and markers[1]:
  50. distance = np.sqrt((markers[0][0]-markers[1][0])**2 + (markers[0][1]-markers[1][1])**2)
  51. centerX = (markers[0][0]+markers[1][0])/2
  52. centerY = (markers[0][1]+markers[1][1])/2
  53. if debug:
  54. cv2.line(image, (int(markers[0][0]),int(markers[0][1])), (int(markers[1][0]),int(markers[1][1])), (255, 0, 0), 2)
  55. cv2.line(image, (0, 20), (int(centerX), 20), (255, 0, 0), 2)
  56. cv2.line(image, (int(centerX), int(centerY)), (int(centerX), 0), (0, 255, 0), 2)
  57. cv2.circle(image, (int(centerX), int(centerY)), 4, (0, 0, 255), -1)
  58. cv2.putText(image, "%.2fpx" % (distance), (int(markers[0][0]), int(markers[0][1]+25)), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
  59. cv2.putText(image, "%.2fpx" % (centerX) , (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
  60. return centerX/image.shape[1], distance/image.shape[1]
  61. else:
  62. return None
  63. class OpticalSensor():
  64. def __init__(self, conf):
  65. self.conf = conf
  66. self.queue = queue.Queue()
  67. self.calibration_state = CalibrationStateMashine()
  68. self.field_height = float(conf["field"]["y"])
  69. self.field_width = float(conf["field"]["x"])
  70. self.target_distance = float(conf["opt_sensor"]["target_distance"])
  71. self.x_offset = float(conf["opt_sensor"]["x_offset"])
  72. self.fov = float(conf["opt_sensor"]["fov"])
  73. self.log_handler = logHandler.get_log_handler()
  74. self._t = None
  75. self.success = False
  76. self.running = True
  77. self.showImage = True
  78. def start(self):
  79. self.log_handler.log_and_print("start optical sensor")
  80. if not self._t:
  81. self._t = threading.Thread(target=self._getFrames, args=())
  82. self._t.start()
  83. def _getFrames(self):
  84. try:
  85. dev = int(self.conf["opt_sensor"]["capture_device"])
  86. except ValueError:
  87. dev = self.conf["opt_sensor"]["capture_device"]
  88. self.cap = cv2.VideoCapture(dev)
  89. while self.running:
  90. self.success, image = self.cap.read()
  91. if self.success:
  92. values = measureDistances(image, self.showImage)
  93. if values:
  94. self.calibrate(values)
  95. position = self.calculate_position(values)
  96. if position != None:
  97. self.pass_to_gui(position + values)
  98. if self.showImage:
  99. cv2.imshow("image", image)
  100. if cv2.waitKey(1) & 0xFF == ord('q'):
  101. self.showImage = False
  102. cv2.destroyAllWindows()
  103. else:
  104. self.log_handler.log_and_print("optical sensor: image capture error")
  105. time.sleep(15)
  106. def calibrate(self, values):
  107. pass
  108. def stop(self):
  109. self.running = False
  110. if self._t:
  111. self._t.join()
  112. self.log_handler.log_and_print("stop optical sensor")
  113. def calculate_position(self, values):
  114. centerX_perc, distance_perc = values
  115. visible_width_at_y = self.target_distance / distance_perc
  116. x = self.x_offset + visible_width_at_y * (centerX_perc-0.5)
  117. alpha = (self.fov/2) / 180 * math.pi
  118. beta = (90 - self.fov/2) / 180 * math.pi
  119. y = visible_width_at_y/2 / math.sin(alpha) * math.sin(beta)
  120. return (x, y)
  121. def pass_to_gui(self, data):
  122. self.queue.put(("data", data))
  123. if __name__ == "__main__":
  124. saveMarkers()
  125. cap = cv2.VideoCapture(1)
  126. while True:
  127. success, image = cap.read()
  128. print(measureDistances(image))
  129. cv2.imshow("image", image)
  130. if cv2.waitKey(1) & 0xFF == ord('q'):
  131. break