12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- import queue
- from struct import calcsize
- import numpy as np
- import time
- import threading # ?
- import random # ?
- from sensors.connection import globalArduinoSlave
- import logHandler
- conn = globalArduinoSlave()
- class MagneticSensor:
- def __init__(self, conf):
- self.conf = conf
- self.queue = queue.Queue()
- self.success = False
- self.mpu_array = []
- self.measurements_gyro = float(conf["mag_sensor"]["measurements_gyro"])
- self.measurements_accel = float(conf["mag_sensor"]["measurements_accel"])
- self.mpu_gyro_offsets = [0.0,0.0,0.0]
- self.log_handler = logHandler.get_log_handler()
-
- def start(self):
- if not conn.isConnected():
- conn.open()
- conn.addRecvCallback(self._readCb)
- #self.dummyActive = True
- #dummyThread = threading.Thread(target=self._readCb_dummy)
- #dummyThread.start()
- def _readCb(self, raw):
- #print("mag: ", conn.getMagneticField())
- #print("accel: ", conn.getAccelValues())
- print("gyro: ", conn.getGyroValues())
- def calibrate(self):
- # Gyroscope Calibration (TESTING!)
- self.mpu_array = [] # clear Array for next calibration
- while True:
- try:
- gyro_x, gyro_y, gyro_z = conn.getGyroValues()
- except:
- continue
-
- self.mpu_array.append([gyro_x,gyro_y,gyro_z])
-
- if np.shape(self.mpu_array)[0] == self.measurements_gyro:
- for qq in range(0,3):
- self.mpu_gyro_offsets[qq] = np.mean(np.array(self.mpu_array)[:,qq]) # average
- break
- print('Gyro Calibration Complete')
- print(self.mpu_gyro_offsets)
- return self.mpu_gyro_offsets
-
- # Accelerometer Calibration
- self.mpu_array = [] # clear array for next calibration
- accel_x, accel_y, accel_z = conn.getAccelValues()
- # Magnetometer Calibration
- self.mpu_array = [] # clear array for next calibration
- magneto_x, magneto_y, magneto_z = conn.getMagneticField()
- def read(self):
- return conn.getMagneticField()
- def stop(self):
- self.log_handler.log_and_print("stop magnetic sensor")
- conn.close()
- def pass_to_gui(self, data):
- self.queue.put(("data", data))
|