import time import statistics import math import threading import random import traceback import queue from sensors.calibration import CalibrationStateMashine from sensors.connection import globalArduinoSlave import logHandler conn = globalArduinoSlave() class AcousticSensor: def __init__(self, conf): self.conf = conf self.queue = queue.Queue() self.calibration_state = CalibrationStateMashine() self.field_height = float(conf["field"]["y"]) self.field_width = float(conf["field"]["x"]) self.sensor_y_offset = float(conf["ac_sensor"]["y_offset"]) self.left_sensor_x_offset = float(conf["ac_sensor"]["left_x_offset"]) self.right_sensor_x_offset = float(conf["ac_sensor"]["right_x_offset"]) self.calibration_y_offset_1 = float(conf["ac_sensor"]["calibration_y_offset_1"]) self.calibration_y_offset_2 = float(conf["ac_sensor"]["calibration_y_offset_2"]) self.calibration_x_offset = float(conf["ac_sensor"]["calibration_x_offset"]) self.sensor_distance = self.field_width - self.left_sensor_x_offset + self.right_sensor_x_offset self.sonic_speed = float(conf["ac_sensor"]["sonicspeed"]) self.overhead_left = float(conf["ac_sensor"]["overhead_left"]) self.overhead_right = float(conf["ac_sensor"]["overhead_right"]) self.log_handler = logHandler.get_log_handler() # temporary calibration variables self.time_vals = [[],[]] self.cal_values = { "front": [0, 0], "back": [0, 0] } self.n = 0 def start(self): self.log_handler.log_and_print("start acoustic sensor") if not conn.isConnected(): conn.open(port = self.conf["arduino"]["port"]) conn.addRecvCallback(self._readCb) # generate dummy values until arduino is ready self.dummyActive = True dummyThread = threading.Thread(target=self._readCb_dummy) dummyThread.start() def start_calibration(self): self.calibration_state.reset_state() self.time_vals = [[],[]] self.calibration_state.next_state() def stop(self): self.log_handler.log_and_print("stop acoustic sensor") self.dummyActive = False conn.close() def _readCb_dummy(self): self.log_handler.log_and_print("acoustic sensor: generating test values") while self.dummyActive: if self.n % 4 < 1: dummyPosition = (0, self.n%1 * self.field_height) elif self.n % 4 < 2: dummyPosition = (self.n%1 * self.field_width, self.field_height) elif self.n % 4 < 3: dummyPosition = (self.field_width, self.field_height - self.n%1 * self.field_height) else: dummyPosition = (self.field_width - self.n%1 * self.field_width, 0) self.n += 0.01 if self.calibration_state.get_state() in [self.calibration_state.ACCUMULATING_1, self.calibration_state.WAITING_POS_1]: dummyPosition = (self.calibration_x_offset + random.randint(-5,5), self.calibration_y_offset_1 + random.randint(-5,5)) elif self.calibration_state.get_state() in [self.calibration_state.ACCUMULATING_2, self.calibration_state.WAITING_POS_2]: dummyPosition = (self.calibration_x_offset + random.randint(-5,5), self.calibration_y_offset_2 + random.randint(-5,5)) # these dummy parameters should also appear after the calibration dummyValue = self.predict_values(dummyPosition, sonic_speed=0.35, overhead_left=30, overhead_right=100) self.calibrate(dummyValue) position = self.calculate_position(dummyValue) if position != None: self.pass_to_gui(position + dummyValue) time.sleep(0.012) self.log_handler.log_and_print("acoustic sensor: disabled test mode") def _readCb(self, raw): if self.dummyActive == True: self.dummyActive = False value = conn.getAcousticRTTs() # partially missing values will be ignored if value[0] >= 0 and value[1] >= 0: self.calibrate(value) position = self.calculate_position(value) if position != None: self.pass_to_gui(position + value) def calibrate(self, value): if self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_1: self.time_vals[0].append(value[0]) self.time_vals[1].append(value[1]) self.calibration_state.progress = len(self.time_vals[0]) / 2 if len(self.time_vals[0]) >= 100: self.cal_values["front"][0] = statistics.mean(self.time_vals[0]) self.cal_values["front"][1] = statistics.mean(self.time_vals[1]) self.time_vals = [[],[]] self.calibration_state.next_state() # signal gui to get next position elif self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_2: self.time_vals[0].append(value[0]) self.time_vals[1].append(value[1]) self.calibration_state.progress = 50 + len(self.time_vals[0]) / 2 if len(self.time_vals[0]) >= 100: self.cal_values["back"][0] = statistics.mean(self.time_vals[0]) self.cal_values["back"][1] = statistics.mean(self.time_vals[1]) # all values have been captured self.log_handler.log_and_print("calibration measurements:", self.cal_values) # calculate distances from config # /|\ # d1 d2 d3 / | \ d4 # _..'|'.._y_off + calYoff_2 / | \y_off + calYoff_2 # /____|____\ /___|___\ # x_off x_off distance_1 = math.sqrt((self.calibration_x_offset + self.left_sensor_x_offset)**2 + (self.sensor_y_offset + self.calibration_y_offset_1)**2 ) distance_2 = math.sqrt((self.calibration_x_offset + self.left_sensor_x_offset)**2 + (self.sensor_y_offset + self.calibration_y_offset_2)**2 ) distancedif = distance_2 - distance_1 timedif = self.cal_values["back"][0] - self.cal_values["front"][0] # speed of sound in mm/us sonicspeed_1 = distancedif / timedif # same for the second set of values distance_3 = math.sqrt((self.right_sensor_x_offset + (self.field_width - self.calibration_x_offset))**2 + (self.sensor_y_offset + self.calibration_y_offset_1)**2 ) distance_4 = math.sqrt((self.right_sensor_x_offset + (self.field_width - self.calibration_x_offset))**2 + (self.sensor_y_offset + self.calibration_y_offset_2)**2 ) distancedif = distance_4 - distance_3 timedif = self.cal_values["back"][1] - self.cal_values["front"][1] sonicspeed_2 = distancedif / timedif # processing time overhead in us overhead_1 = statistics.mean((self.cal_values["front"][0] - distance_1/sonicspeed_1, self.cal_values["back"][0] - distance_2/sonicspeed_1)) overhead_2 = statistics.mean((self.cal_values["front"][1] - distance_3/sonicspeed_2, self.cal_values["back"][1] - distance_4/sonicspeed_2)) # calculate calibration results self.sonic_speed = statistics.mean((sonicspeed_1,sonicspeed_2)) self.overhead_left = overhead_1 self.overhead_right = overhead_2 self.log_handler.log_and_print("calibration results:") self.log_handler.log_and_print(" sonicspeed: {:8.6f} mm/us".format(self.sonic_speed)) self.log_handler.log_and_print(" overhead_left: {:8.3f} us".format(self.overhead_left)) self.log_handler.log_and_print(" overhead_right: {:8.3f} us".format(self.overhead_right)) self.calibration_state.next_state() def read(self): value = conn.getAcousticRTTs() return value def calculate_position(self, values): val1, val2 = values val1 -= self.overhead_left val2 -= self.overhead_right distance_left = val1 * self.sonic_speed distance_right = val2 * self.sonic_speed # compute intersection of distance circles x = (self.sensor_distance**2 - distance_right**2 + distance_left**2) / (2*self.sensor_distance) + self.left_sensor_x_offset if distance_left**2 - x**2 >= 0: y = math.sqrt(distance_left**2 - x**2) - self.sensor_y_offset return (x, y) else: return None def predict_values(self, position, sonic_speed, overhead_left, overhead_right): x, y = position distance_left = math.sqrt((y + self.sensor_y_offset)**2 + x**2) distance_right = math.sqrt(self.sensor_distance**2 + distance_left**2 - x * 2 * self.sensor_distance - x * self.left_sensor_x_offset) return (distance_left / sonic_speed + overhead_left, distance_right / sonic_speed + overhead_right) def pass_to_gui(self, data): self.queue.put(("data", data))