123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import time
- import statistics
- import math
- import threading
- import random
- import traceback
- from sensors.connection import globalArduinoSlave
- import logHandler
- conn = globalArduinoSlave()
- class AcusticSensor:
- def __init__(self, conf, ac_queue, calibration_state):
- self.conf = conf
- self.ac_queue = ac_queue
- self.calibration_state = calibration_state
- self.field_height = float(conf["field"]["y"])
- self.field_width = float(conf["field"]["x"])
- self.sensor_y_offset = float(conf["ac_sensor"]["y_offset"])
- self.left_sensor_x_offset = float(conf["ac_sensor"]["left_x_offset"])
- self.right_sensor_x_offset = float(conf["ac_sensor"]["right_x_offset"])
- self.sensor_distance = self.field_width - self.left_sensor_x_offset + self.right_sensor_x_offset
- self.sonic_speed = float(conf["ac_sensor"]["sonicspeed"])
- self.overhead_left = float(conf["ac_sensor"]["overhead_left"])
- self.overhead_right = float(conf["ac_sensor"]["overhead_right"])
- self.log_handler = logHandler.get_log_handler()
- self.log_handler.add_item("start ac_sensor")
- # temporary calibration variables
- self.time_vals = [[],[]]
- self.cal_values = {
- "left": [0, 0],
- "right": [0, 0]
- }
- self.n = 0
- def start(self):
- if not conn.isConnected():
- conn.open(port = self.conf["arduino"]["port"])
- conn.addRecvCallback(self._readCb)
- # generate dummy values until arduino is ready
- self.dummyActive = True
- dummyThread = threading.Thread(target=self._readCb_dummy)
- dummyThread.start()
-
- def start_calibration(self):
- self.calibration_state.reset_state()
- self.time_vals = [[],[]]
- self.calibration_state.next_state()
- def stop(self):
- print("stop acoustic sensor")
- self.dummyActive = False
- conn.close()
- def _readCb_dummy(self):
- while self.dummyActive:
- value = (900+random.randint(0,300),900+random.randint(0,300))
- value = ((math.sin(self.n)+1)*400+900, (math.cos(self.n)+1)*400+900)
- self.n += 0.02
- if self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_1:
- value = (1541+random.randint(-50,50),2076+random.randint(-50,50))
- elif self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_2:
- value = (2076+random.randint(-50,50),1541+random.randint(-50,50))
- self.calibrate(value)
- self.pass_to_gui(self.calculate_position(value) + value)
- time.sleep(0.01)
- def _readCb(self, raw):
- self.dummyActive = False
- value = conn.getAcusticRTTs()
- # partially missing values will be ignored
- if value[0] >= 0 and value[1] >= 0:
- self.calibrate(value)
- position = self.calculate_position(value)
- if position != None:
- self.pass_to_gui(position + value)
- def calibrate(self, value):
- if self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_1:
- self.time_vals[0].append(value[0])
- self.time_vals[1].append(value[1])
- self.calibration_state.progress = len(self.time_vals[0]) / 2
- if len(self.time_vals[0]) >= 100:
- self.cal_values["left"][0] = statistics.mean(self.time_vals[0])
- self.cal_values["right"][1] = statistics.mean(self.time_vals[1])
- self.time_vals = [[],[]]
- self.calibration_state.next_state() # signal gui to get next position
- elif self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_2:
- self.time_vals[0].append(value[0])
- self.time_vals[1].append(value[1])
- self.calibration_state.progress = 50 + len(self.time_vals[0]) / 2
- if len(self.time_vals[0]) >= 100:
- self.cal_values["left"][1] = statistics.mean(self.time_vals[0])
- self.cal_values["right"][0] = statistics.mean(self.time_vals[1])
- # all values have been captured
- print("calibration measurements:", self.cal_values)
- self.log_handler.add_item("calibration measurements:", self.cal_values)
-
- # calculate distances from config
- # /| _.-|
- # d1 / | d2 _.-` |
- # / | y_off + height _.-` | y_off + height
- # /___| -____________|
- # x_off x_off + width
- distance_1 = math.sqrt(self.left_sensor_x_offset**2 + (self.sensor_y_offset + self.field_height)**2 )
- distance_2 = math.sqrt((self.left_sensor_x_offset + self.field_width)**2 + (self.sensor_y_offset + self.field_height)**2 )
- distancedif = distance_2 - distance_1
- timedif = self.cal_values["left"][1] - self.cal_values["left"][0]
- # speed of sound in mm/us
- sonicspeed_1 = distancedif / timedif
- # processing time overhead in us
- overhead_1 = statistics.mean((self.cal_values["left"][1] - distance_1/sonicspeed_1, self.cal_values["left"][0] - distance_2/sonicspeed_1))
- # same for the second set of values
- distance_1 = math.sqrt(self.right_sensor_x_offset**2 + (self.sensor_y_offset + self.field_height)**2 )
- distance_2 = math.sqrt((self.right_sensor_x_offset + self.field_width)**2 + (self.sensor_y_offset + self.field_height)**2 )
- distancedif = distance_2 - distance_1
- timedif = self.cal_values["right"][1] - self.cal_values["right"][0]
- sonicspeed_2 = distancedif / timedif
- overhead_2 = statistics.mean((self.cal_values["right"][0] - distance_1/sonicspeed_2, self.cal_values["right"][1] - distance_2/sonicspeed_2))
- # calculate calibration results
- self.sonic_speed = statistics.mean((sonicspeed_1,sonicspeed_2))
- self.overhead_left = overhead_1
- self.overhead_right = overhead_2
- print("calibration results:")
- print(" sonicspeed: {:8.6f} mm/us".format(self.sonic_speed))
- print(" overhead_left: {:8.3f} us".format(self.overhead_left))
- print(" overhead_right: {:8.3f} us".format(self.overhead_right))
- self.log_handler.add_item("calibration results:")
- self.log_handler.add_item(" sonicspeed: {:8.6f} mm/us".format(self.sonic_speed))
- self.log_handler.add_item(" overhead_left: {:8.3f} us".format(self.overhead_left))
- self.log_handler.add_item(" overhead_right: {:8.3f} us".format(self.overhead_right))
- self.calibration_state.next_state()
- def read(self):
- value = conn.getAcusticRTTs()
- return value
- def calculate_position(self,values):
- try:
- val1, val2 = values
- val1 -= self.overhead_left
- val2 -= self.overhead_right
- distance_left = val1 * self.sonic_speed
- distance_right = val2 * self.sonic_speed
- # compute intersection of distance circles
- x = (self.sensor_distance**2 - distance_right**2 + distance_left**2) / (2*self.sensor_distance) + self.left_sensor_x_offset
- if distance_left**2 - x**2 >= 0:
- y = math.sqrt(distance_left**2 - x**2) - self.sensor_y_offset
- return (x, y)
- else:
- return None
- except Exception as e:
- print(values)
- traceback.print_exc()
- def pass_to_gui(self, data):
- self.ac_queue.put(("data", data))
|