opticalSensor.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #!/bin/python3
  2. import time
  3. import math
  4. import threading
  5. import traceback
  6. import queue
  7. import numpy as np
  8. import cv2
  9. import cv2.aruco as aruco
  10. if __name__ != "__main__":
  11. from sensors.calibration import CalibrationStateMashine
  12. import logHandler
  13. aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_50)
  14. def saveMarkers():
  15. image = np.zeros((400,400), np.uint8)
  16. image[:,:] = (255)
  17. image[ 50: 50 + 100, 50: 50 + 100] = aruco_dict.drawMarker(0, 100)
  18. image[ 50: 50 + 100,-50 - 100:-50] = aruco_dict.drawMarker(1, 100)
  19. cv2.imwrite("markers.png", image)
  20. parameters = aruco.DetectorParameters_create()
  21. def find_marker(image, debug=True):
  22. # Our operations on the frame come here
  23. gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  24. im_bw = cv2.equalizeHist(gray)
  25. if debug:
  26. image[:,:,0] = im_bw
  27. image[:,:,1] = im_bw
  28. image[:,:,2] = im_bw
  29. corners, ids, rejectedImgPoints = aruco.detectMarkers(im_bw, aruco_dict, parameters=parameters)
  30. markers = [None] * 4
  31. if not corners:
  32. return markers
  33. for corner, id in zip(corners, ids.flatten()):
  34. # draw the bounding box of the ArUCo detection
  35. c = corner[0]
  36. cX = sum(c[:,0]) / 4
  37. cY = sum(c[:,1]) / 4
  38. if debug:
  39. cv2.line(image, tuple(c[0]), tuple(c[1]), (0, 255, 0), 2)
  40. cv2.line(image, tuple(c[1]), tuple(c[2]), (0, 255, 0), 2)
  41. cv2.line(image, tuple(c[2]), tuple(c[3]), (0, 255, 0), 2)
  42. cv2.line(image, tuple(c[3]), tuple(c[0]), (0, 255, 0), 2)
  43. cv2.putText(image, str(id), (int(c[0][0]), int(c[0][1]) - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
  44. cv2.circle(image, (int(cX), int(cY)), 4, (0, 0, 255), -1)
  45. if id < 4:
  46. markers[id] = (cX, cY)
  47. return markers
  48. def measureDistances(image, debug=True):
  49. markers = find_marker(image, debug)
  50. if markers[0] and markers[1]:
  51. distance = np.sqrt((markers[0][0]-markers[1][0])**2 + (markers[0][1]-markers[1][1])**2)
  52. centerX = (markers[0][0]+markers[1][0])/2
  53. centerY = (markers[0][1]+markers[1][1])/2
  54. if debug:
  55. cv2.line(image, (int(markers[0][0]),int(markers[0][1])), (int(markers[1][0]),int(markers[1][1])), (255, 0, 0), 2)
  56. cv2.line(image, (0, 20), (int(centerX), 20), (255, 0, 0), 2)
  57. cv2.line(image, (int(centerX), int(centerY)), (int(centerX), 0), (0, 255, 0), 2)
  58. cv2.circle(image, (int(centerX), int(centerY)), 4, (0, 0, 255), -1)
  59. cv2.putText(image, "%.2fpx" % (distance), (int(markers[0][0]), int(markers[0][1]+25)), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
  60. cv2.putText(image, "%.2fpx" % (centerX) , (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
  61. return centerX/image.shape[1], distance/image.shape[1]
  62. else:
  63. return None
  64. class OpticalSensor():
  65. def __init__(self, conf):
  66. self.conf = conf
  67. self.queue = queue.Queue()
  68. self.calibration_state = CalibrationStateMashine()
  69. self.field_height = float(conf["field"]["y"])
  70. self.field_width = float(conf["field"]["x"])
  71. self.target_distance = float(conf["opt_sensor"]["target_distance"])
  72. self.x_offset = float(conf["opt_sensor"]["x_offset"])
  73. self.fov = float(conf["opt_sensor"]["fov"])
  74. self.log_handler = logHandler.get_log_handler()
  75. self._t = None
  76. self.success = False
  77. self.running = True
  78. self.showImage = True
  79. def start(self):
  80. self.log_handler.log_and_print("start optical sensor")
  81. if not self._t:
  82. self._t = threading.Thread(target=self._getFrames, args=())
  83. self._t.start()
  84. def _getFrames(self):
  85. self.cap = cv2.VideoCapture(self.conf["opt_sensor"]["capture_device"])
  86. while self.running:
  87. self.success, image = self.cap.read()
  88. if self.success:
  89. values = measureDistances(image, self.showImage)
  90. if values:
  91. self.calibrate(values)
  92. position = self.calculate_position(values)
  93. if position != None:
  94. self.pass_to_gui(position + values)
  95. if self.showImage:
  96. cv2.imshow("image", image)
  97. if cv2.waitKey(1) & 0xFF == ord('q'):
  98. self.showImage = False
  99. cv2.destroyAllWindows()
  100. else:
  101. self.log_handler.log_and_print("optical sensor: image capture error")
  102. time.sleep(15)
  103. def calibrate(self, values):
  104. pass
  105. def stop(self):
  106. self.running = False
  107. if self._t:
  108. self._t.join()
  109. self.log_handler.log_and_print("stop optical sensor")
  110. def calculate_position(self, values):
  111. centerX_perc, distance_perc = values
  112. visible_width_at_y = self.target_distance / distance_perc
  113. x = self.x_offset + visible_width_at_y * (centerX_perc-0.5)
  114. alpha = (self.fov/2) / 180 * math.pi
  115. beta = (90 - self.fov/2) / 180 * math.pi
  116. y = visible_width_at_y/2 / math.sin(alpha) * math.sin(beta)
  117. return (x, y)
  118. def pass_to_gui(self, data):
  119. self.queue.put(("data", data))
  120. if __name__ == "__main__":
  121. saveMarkers()
  122. cap = cv2.VideoCapture(1)
  123. while True:
  124. success, image = cap.read()
  125. print(measureDistances(image))
  126. cv2.imshow("image", image)
  127. if cv2.waitKey(1) & 0xFF == ord('q'):
  128. break