1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import queue
- from struct import calcsize
- import numpy as np
- import time
- import threading # ?
- import random # ?
- from sensors.connection import globalArduinoSlave
- import logHandler
- conn = globalArduinoSlave()
- class MagneticSensor:
- def __init__(self, conf):
- self.conf = conf
- self.queue = queue.Queue()
- self.success = False
- self.mpu_array = []
- self.mpu_gyro_offsets = [0.0,0.0,0.0]
- self.log_handler = logHandler.get_log_handler()
-
- def start(self):
- if not conn.isConnected():
- conn.open()
- conn.addRecvCallback(self._readCb)
- self.calibrate()
- #self.dummyActive = True
- #dummyThread = threading.Thread(target=self._readCb_dummy)
- #dummyThread.start()
- def _readCb(self, raw):
- #print("mag: ", conn.getMagneticField())
- #print("accel: ", conn.getAccelValues())
- print("gyro: ", conn.getGyroValues())
- def calibrate(self, conf):
- # Gyroscope Calibration
- self.mpu_array = [] # clear Array for next calibration
- while True:
- try:
- gyro_x, gyro_y, gyro_z = conn.getGyroValues()
- except:
- continue
-
- self.mpu_gyro_array.append([gyro_x,gyro_y,gyro_z])
-
- if np.shape(self.mpu_array)[0] == conf.measurements_gyro:
- for qq in range(0,3):
- self.mpu_gyro_offsets[qq] = np.mean(np.array(self.mpu_array)[:,qq]) # average
- break
- print('Gyro Calibration Complete')
- return self.mpu_gyro_offsets
-
- # Accelerometer Calibration
- self.mpu_array = [] # clear array for next calibration
- accel_x, accel_y, accel_z = conn.getAccelValues()
- # Magnetometer Calibration
- self.mpu_array = [] # clear array for next calibration
- magneto_x, magneto_y, magneto_z = conn.getMagneticField()
- def read(self):
- return conn.getMagneticField()
- def stop(self):
- self.log_handler.log_and_print("stop magnetic sensor")
- conn.close()
- def pass_to_gui(self, data):
- self.queue.put(("data", data))
|