123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- from distutils.command.config import config
- import queue
- import statistics
- from struct import calcsize
- import numpy as np
- import time
- from configparser import ConfigParser
- from sensors.calibration import CalibrationStateMashine
- from sensors.connection import globalArduinoSlave
- import logHandler
- conn = globalArduinoSlave()
- class MagneticSensor:
- def __init__(self, conf):
- self.conf = conf
- self.queue = queue.Queue()
- self.calibration_state = CalibrationStateMashine()
- self.success = False
- self.offset_x = float(conf["mag_sensor"]["off_x"])
- self.offset_y = float(conf["mag_sensor"]["off_y"])
- self.scale_x = float(conf["mag_sensor"]["scale_x"])
- self.scale_y = float(conf["mag_sensor"]["scale_y"])
- self.max_x = float(conf["mag_sensor"]["max_x"])
- self.max_y = float(conf["mag_sensor"]["max_y"])
- self.log_handler = logHandler.get_log_handler()
-
- def start(self):
- if not conn.isConnected():
- conn.open()
- self.success = True
- conn.addRecvCallback(self._readCb)
- def _readCb(self, raw):
- value = conn.getMagneticField()
- if value[0] >= 0 and value[1] >= 0:
- self.calibrate(value)
- position = self.calculate_position(value)
- if position != None:
- self.pass_to_gui(position + value)
-
- def start_calibration(self): ###
- self.calibration_state.reset_state()
- self.time_vals = [[],[],[]]
- self.calibration_state.next_state()
-
- def calibrate(self, value): ### öffnet erstmal popup :) # how?
- if self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_1:
- self.time_vals[0].append(value[0])
- self.time_vals[1].append(value[1])
- self.time_vals[2].append(value[2])
- self.calibration_state.progress = len(self.time_vals[0]) / 2
- if len(self.time_vals[0]) >= 100:
- self.cal_values["front"][0] = statistics.mean(self.time_vals[0])
- self.cal_values["front"][1] = statistics.mean(self.time_vals[1])
- self.cal_values["front"][2] = statistics.mean(self.time_vals[2])
- self.time_vals = [[],[],[]]
- self.calibration_state.next_state() # signal gui to get next position
- elif self.calibration_state.get_state() == self.calibration_state.ACCUMULATING_2:
- self.time_vals[0].append(value[0])
- self.time_vals[1].append(value[1])
- self.time_vals[2].append(value[2])
- self.calibration_state.progress = 50 + len(self.time_vals[0]) / 2
- if len(self.time_vals[0]) >= 100:
- self.cal_values["back"][0] = statistics.mean(self.time_vals[0])
- self.cal_values["back"][1] = statistics.mean(self.time_vals[1])
- self.cal_values["back"][2] = statistics.mean(self.time_vals[2])
-
- # all values have been captured
-
- # Hard iron distortion & Soft iron distortion
- self.offset_x = (self.cal_values["back"][0] - self.cal_values["front"][0]) / 2
- self.offset_y = (self.cal_values["back"][1] - self.cal_values["front"][1]) / 2
- avg_delta_x = (self.cal_values["back"][0] - self.cal_values["front"][0]) / 2
- avg_delta_y = (self.cal_values["back"][1] - self.cal_values["front"][1]) / 2
- avg_delta_z = (self.cal_values["back"][2] - self.cal_values["front"][2]) / 2
- avg_delta = (avg_delta_x + avg_delta_y + avg_delta_z) / 3
- self.scale_x = avg_delta / avg_delta_x
- self.scale_y = avg_delta / avg_delta_y
- self.scale_z = avg_delta / avg_delta_z
- # max values for placeholder algorithm
- self.max_x = (self.cal_values["back"][0] - self.offset_x) * self.scale_x
- self.max_y = (self.cal_values["back"][1] - self.offset_y) * self.scale_y
-
- self.calibration_state.next_state()
- return self.offset_x, self.offset_y, self.scale_x, self.scale_y, self.max_x, self.max_y
- def read(self):
- value = conn.getMagneticField()
- return value
-
- def calculate_position(self): ###
- corrected_x = (conn.getMagneticField[0] - self.offset_x) * self.scale_x
- corrected_y = (conn.getMagneticField[1] - self.offset_y) * self.scale_y
- # placeholder algorithm (to see if the sensor even works)
- x = (corrected_x * 400) / self.max_x
- y = (corrected_y * 400) / self.max_y
- return (x, y)
- def stop(self):
- self.log_handler.log_and_print("stop magnetic sensor")
- self.success = False
- conn.close()
- def pass_to_gui(self, data):
- self.queue.put(("data", data))
|