|
@@ -1,13 +1,19 @@
|
|
#!/bin/python3
|
|
#!/bin/python3
|
|
|
|
|
|
import time
|
|
import time
|
|
|
|
+import math
|
|
import threading
|
|
import threading
|
|
import traceback
|
|
import traceback
|
|
|
|
+import queue
|
|
|
|
|
|
import numpy as np
|
|
import numpy as np
|
|
import cv2
|
|
import cv2
|
|
import cv2.aruco as aruco
|
|
import cv2.aruco as aruco
|
|
|
|
|
|
|
|
+if __name__ != "__main__":
|
|
|
|
+ from sensors.calibration import CalibrationStateMashine
|
|
|
|
+ import logHandler
|
|
|
|
+
|
|
aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_50)
|
|
aruco_dict = aruco.Dictionary_get(aruco.DICT_4X4_50)
|
|
|
|
|
|
|
|
|
|
@@ -16,18 +22,20 @@ def saveMarkers():
|
|
image[:,:] = (255)
|
|
image[:,:] = (255)
|
|
|
|
|
|
image[ 50: 50 + 100, 50: 50 + 100] = aruco_dict.drawMarker(0, 100)
|
|
image[ 50: 50 + 100, 50: 50 + 100] = aruco_dict.drawMarker(0, 100)
|
|
- image[-50 - 100:-50, 50: 50 + 100] = aruco_dict.drawMarker(1, 100)
|
|
|
|
- image[ 50: 50 + 100,-50 - 100:-50] = aruco_dict.drawMarker(2, 100)
|
|
|
|
- image[-50 - 100:-50,-50 - 100:-50] = aruco_dict.drawMarker(3, 100)
|
|
|
|
|
|
+ image[ 50: 50 + 100,-50 - 100:-50] = aruco_dict.drawMarker(1, 100)
|
|
|
|
|
|
cv2.imwrite("markers.png", image)
|
|
cv2.imwrite("markers.png", image)
|
|
|
|
|
|
parameters = aruco.DetectorParameters_create()
|
|
parameters = aruco.DetectorParameters_create()
|
|
|
|
|
|
|
|
|
|
-def find_marker(image):
|
|
|
|
|
|
+def find_marker(image, debug=True):
|
|
# Our operations on the frame come here
|
|
# Our operations on the frame come here
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
+ if debug:
|
|
|
|
+ image[:,:,0] = gray
|
|
|
|
+ image[:,:,1] = gray
|
|
|
|
+ image[:,:,2] = gray
|
|
corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
|
|
corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
|
|
|
|
|
|
markers = [None] * 4
|
|
markers = [None] * 4
|
|
@@ -38,15 +46,16 @@ def find_marker(image):
|
|
for corner, id in zip(corners, ids.flatten()):
|
|
for corner, id in zip(corners, ids.flatten()):
|
|
# draw the bounding box of the ArUCo detection
|
|
# draw the bounding box of the ArUCo detection
|
|
c = corner[0]
|
|
c = corner[0]
|
|
- cv2.line(image, tuple(c[0]), tuple(c[1]), (0, 255, 0), 2)
|
|
|
|
- cv2.line(image, tuple(c[1]), tuple(c[2]), (0, 255, 0), 2)
|
|
|
|
- cv2.line(image, tuple(c[2]), tuple(c[3]), (0, 255, 0), 2)
|
|
|
|
- cv2.line(image, tuple(c[3]), tuple(c[0]), (0, 255, 0), 2)
|
|
|
|
- cv2.putText(image, str(id), (int(c[0][0]), int(c[0][1]) - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
|
|
|
-
|
|
|
|
cX = sum(c[:,0]) / 4
|
|
cX = sum(c[:,0]) / 4
|
|
cY = sum(c[:,1]) / 4
|
|
cY = sum(c[:,1]) / 4
|
|
- cv2.circle(image, (int(cX), int(cY)), 4, (0, 0, 255), -1)
|
|
|
|
|
|
+
|
|
|
|
+ if debug:
|
|
|
|
+ cv2.line(image, tuple(c[0]), tuple(c[1]), (0, 255, 0), 2)
|
|
|
|
+ cv2.line(image, tuple(c[1]), tuple(c[2]), (0, 255, 0), 2)
|
|
|
|
+ cv2.line(image, tuple(c[2]), tuple(c[3]), (0, 255, 0), 2)
|
|
|
|
+ cv2.line(image, tuple(c[3]), tuple(c[0]), (0, 255, 0), 2)
|
|
|
|
+ cv2.putText(image, str(id), (int(c[0][0]), int(c[0][1]) - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
|
|
|
+ cv2.circle(image, (int(cX), int(cY)), 4, (0, 0, 255), -1)
|
|
|
|
|
|
if id < 4:
|
|
if id < 4:
|
|
markers[id] = (cX, cY)
|
|
markers[id] = (cX, cY)
|
|
@@ -54,63 +63,105 @@ def find_marker(image):
|
|
return markers
|
|
return markers
|
|
|
|
|
|
|
|
|
|
-def measureDistances(image):
|
|
|
|
- markers = find_marker(image)
|
|
|
|
|
|
+def measureDistances(image, debug=True):
|
|
|
|
+ markers = find_marker(image, debug)
|
|
|
|
|
|
if markers[0] and markers[1]:
|
|
if markers[0] and markers[1]:
|
|
- cv2.line(image, (int(markers[0][0]),int(markers[0][1])), (int(markers[1][0]),int(markers[1][1])), (255, 0, 0), 2)
|
|
|
|
|
|
|
|
- length = np.sqrt((markers[0][0]-markers[1][0])**2 + (markers[0][1]-markers[1][1])**2)
|
|
|
|
- middle = abs(markers[0][0]-markers[1][0])
|
|
|
|
-
|
|
|
|
- return middle, length
|
|
|
|
|
|
+ distance = np.sqrt((markers[0][0]-markers[1][0])**2 + (markers[0][1]-markers[1][1])**2)
|
|
|
|
+ centerX = (markers[0][0]+markers[1][0])/2
|
|
|
|
+ centerY = (markers[0][1]+markers[1][1])/2
|
|
|
|
+
|
|
|
|
+ if debug:
|
|
|
|
+ cv2.line(image, (int(markers[0][0]),int(markers[0][1])), (int(markers[1][0]),int(markers[1][1])), (255, 0, 0), 2)
|
|
|
|
+ cv2.line(image, (0, 20), (int(centerX), 20), (255, 0, 0), 2)
|
|
|
|
+ cv2.line(image, (int(centerX), int(centerY)), (int(centerX), 0), (0, 255, 0), 2)
|
|
|
|
+ cv2.circle(image, (int(centerX), int(centerY)), 4, (0, 0, 255), -1)
|
|
|
|
+ cv2.putText(image, "%.2fpx" % (distance), (int(markers[0][0]), int(markers[0][1]+25)), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
|
|
|
|
+ cv2.putText(image, "%.2fpx" % (centerX) , (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
|
|
|
|
+
|
|
|
|
+ return centerX/image.shape[1], distance/image.shape[1]
|
|
|
|
|
|
else:
|
|
else:
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
|
|
class OpticalSensor():
|
|
class OpticalSensor():
|
|
- def __init__(self):
|
|
|
|
- self.cap = cv2.VideoCapture(0)
|
|
|
|
|
|
+ def __init__(self, conf):
|
|
|
|
+ self.conf = conf
|
|
|
|
+ self.queue = queue.Queue()
|
|
|
|
+ self.calibration_state = CalibrationStateMashine()
|
|
|
|
+ self.field_height = float(conf["field"]["y"])
|
|
|
|
+ self.field_width = float(conf["field"]["x"])
|
|
|
|
+ self.target_distance = float(conf["opt_sensor"]["target_distance"])
|
|
|
|
+ self.x_offset = float(conf["opt_sensor"]["x_offset"])
|
|
|
|
+ self.fov = float(conf["opt_sensor"]["fov"])
|
|
|
|
+ self.log_handler = logHandler.get_log_handler()
|
|
self._t = None
|
|
self._t = None
|
|
- self.values = None
|
|
|
|
|
|
+ self.success = False
|
|
|
|
+ self.running = True
|
|
|
|
+ self.showImage = True
|
|
|
|
|
|
def start(self):
|
|
def start(self):
|
|
|
|
+ self.log_handler.log_and_print("start optical sensor")
|
|
if not self._t:
|
|
if not self._t:
|
|
self._t = threading.Thread(target=self._getFrames, args=())
|
|
self._t = threading.Thread(target=self._getFrames, args=())
|
|
- self._t.daemon = True # thread dies when main thread (only non-daemon thread) exits.
|
|
|
|
self._t.start()
|
|
self._t.start()
|
|
|
|
|
|
def _getFrames(self):
|
|
def _getFrames(self):
|
|
- while True:
|
|
|
|
- success, image = self.cap.read()
|
|
|
|
- if success:
|
|
|
|
- self.values = measureDistances(image)
|
|
|
|
- print("opt:", self.values)
|
|
|
|
-
|
|
|
|
- def calibrate(self, x, y):
|
|
|
|
|
|
+ try:
|
|
|
|
+ dev = int(self.conf["opt_sensor"]["capture_device"])
|
|
|
|
+ except ValueError:
|
|
|
|
+ dev = self.conf["opt_sensor"]["capture_device"]
|
|
|
|
+ self.cap = cv2.VideoCapture(dev)
|
|
|
|
+
|
|
|
|
+ while self.running:
|
|
|
|
+ self.success, image = self.cap.read()
|
|
|
|
+ if self.success:
|
|
|
|
+ values = measureDistances(image, self.showImage)
|
|
|
|
+ if values:
|
|
|
|
+ self.calibrate(values)
|
|
|
|
+ position = self.calculate_position(values)
|
|
|
|
+ if position != None:
|
|
|
|
+ self.pass_to_gui(position + values)
|
|
|
|
+ if self.showImage:
|
|
|
|
+ cv2.imshow("image", image)
|
|
|
|
+ if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
|
|
+ self.showImage = False
|
|
|
|
+ cv2.destroyAllWindows()
|
|
|
|
+ else:
|
|
|
|
+ self.log_handler.log_and_print("optical sensor: image capture error")
|
|
|
|
+ time.sleep(15)
|
|
|
|
+
|
|
|
|
+ def calibrate(self, values):
|
|
pass
|
|
pass
|
|
|
|
|
|
- def read(self):
|
|
|
|
- return self.values
|
|
|
|
|
|
+ def stop(self):
|
|
|
|
+ self.running = False
|
|
|
|
+ if self._t:
|
|
|
|
+ self._t.join()
|
|
|
|
+ self.log_handler.log_and_print("stop optical sensor")
|
|
|
|
+
|
|
|
|
+ def calculate_position(self, values):
|
|
|
|
+ centerX_perc, distance_perc = values
|
|
|
|
|
|
|
|
+ visible_width_at_y = self.target_distance / distance_perc
|
|
|
|
+ x = self.x_offset + visible_width_at_y * (centerX_perc-0.5)
|
|
|
|
+ alpha = (self.fov/2) / 180 * math.pi
|
|
|
|
+ beta = (90 - self.fov/2) / 180 * math.pi
|
|
|
|
+ y = visible_width_at_y/2 / math.sin(alpha) * math.sin(beta)
|
|
|
|
+
|
|
|
|
+ return (x, y)
|
|
|
|
+
|
|
|
|
+ def pass_to_gui(self, data):
|
|
|
|
+ self.queue.put(("data", data))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
- # opt = OpticalSensor()
|
|
|
|
- # opt.start()
|
|
|
|
- # while True:
|
|
|
|
- # time.sleep(1)
|
|
|
|
- cap = cv2.VideoCapture(0)
|
|
|
|
|
|
+ saveMarkers()
|
|
|
|
+ cap = cv2.VideoCapture(1)
|
|
while True:
|
|
while True:
|
|
success, image = cap.read()
|
|
success, image = cap.read()
|
|
- markers = find_marker(image)
|
|
|
|
-
|
|
|
|
- if markers[0] and markers[1]:
|
|
|
|
- cv2.line(image, (int(markers[0][0]),int(markers[0][1])), (int(markers[1][0]),int(markers[1][1])), (255, 0, 0), 2)
|
|
|
|
- length = np.sqrt((markers[0][0]-markers[1][0])**2 + (markers[0][1]-markers[1][1])**2)
|
|
|
|
-
|
|
|
|
- cv2.putText(image, "%.2fpx" % (length), (int(markers[0][0]), int(markers[0][1])), cv2.FONT_HERSHEY_SIMPLEX,
|
|
|
|
- 1.0, (255, 255, 0), 3)
|
|
|
|
|
|
+ print(measureDistances(image))
|
|
cv2.imshow("image", image)
|
|
cv2.imshow("image", image)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
break
|