import queue import time import threading from sensors.calibration import CalibrationStateMashine from sensors.connection import globalArduinoSlave import logHandler conn = globalArduinoSlave() class MagneticSensor: def __init__(self, conf): self.conf = conf self.queue = queue.Queue() self.calibration_state = CalibrationStateMashine() self.field_height = float(conf["field"]["y"]) self.field_width = float(conf["field"]["x"]) self.success = False self.log_handler = logHandler.get_log_handler() # neu #pass self.n = 0 def start(self): self.log_handler.log_and_print("start magnetic sensor") if not conn.isConnected(): conn.open(port = self.conf["arduino"]["port"]) conn.addRecvCallback(self._readCb) self.dummyActive = True dummyThread = threading.Thread(target=self._readCb_dummy) dummyThread.start() def _readCb(self, raw): print("mag: ", conn.getMagneticField()) def _readCb_dummy(self): self.log_handler.log_and_print("magnetic sensor: generating test values") while self.dummyActive: if self.n % 4 < 1: dummyPosition = (0, self.n%1 * self.field_height) elif self.n % 4 < 2: dummyPosition = (self.n%1 * self.field_width, self.field_height) elif self.n % 4 < 3: dummyPosition = (self.field_width, self.field_height - self.n%1 * self.field_height) else: dummyPosition = (self.field_width - self.n%1 * self.field_width, 0) self.n += 0.01 def calibrate(self, x, y): pass def read(self): return conn.getMagneticField() def stop(self): # neu self.log_handler.log_and_print("stop magnetic sensor") self.dummyActive = False conn.close()