import queue from struct import calcsize import numpy as np import time import threading # ? import random # ? from sensors.connection import globalArduinoSlave import logHandler conn = globalArduinoSlave() class MagneticSensor: def __init__(self, conf): self.conf = conf self.queue = queue.Queue() self.success = False self.mpu_array = [] self.mpu_gyro_offsets = [0.0,0.0,0.0] self.log_handler = logHandler.get_log_handler() def start(self): if not conn.isConnected(): conn.open() conn.addRecvCallback(self._readCb) self.calibrate() #self.dummyActive = True #dummyThread = threading.Thread(target=self._readCb_dummy) #dummyThread.start() def _readCb(self, raw): #print("mag: ", conn.getMagneticField()) #print("accel: ", conn.getAccelValues()) print("gyro: ", conn.getGyroValues()) def calibrate(self, conf): # Gyroscope Calibration self.mpu_array = [] # clear Array for next calibration while True: try: gyro_x, gyro_y, gyro_z = conn.getGyroValues() except: continue self.mpu_gyro_array.append([gyro_x,gyro_y,gyro_z]) if np.shape(self.mpu_array)[0] == conf.measurements_gyro: for qq in range(0,3): self.mpu_gyro_offsets[qq] = np.mean(np.array(self.mpu_array)[:,qq]) # average break print('Gyro Calibration Complete') return self.mpu_gyro_offsets # Accelerometer Calibration self.mpu_array = [] # clear array for next calibration accel_x, accel_y, accel_z = conn.getAccelValues() # Magnetometer Calibration self.mpu_array = [] # clear array for next calibration magneto_x, magneto_y, magneto_z = conn.getMagneticField() def read(self): return conn.getMagneticField() def stop(self): self.log_handler.log_and_print("stop magnetic sensor") conn.close() def pass_to_gui(self, data): self.queue.put(("data", data))