|
@@ -1,62 +0,0 @@
|
|
|
-import queue
|
|
|
-import time
|
|
|
-import threading
|
|
|
-
|
|
|
-from sensors.calibration import CalibrationStateMashine
|
|
|
-from sensors.connection import globalArduinoSlave
|
|
|
-import logHandler
|
|
|
-
|
|
|
-conn = globalArduinoSlave()
|
|
|
-
|
|
|
-
|
|
|
-class MagneticSensor:
|
|
|
- def __init__(self, conf):
|
|
|
- self.conf = conf
|
|
|
- self.queue = queue.Queue()
|
|
|
- self.calibration_state = CalibrationStateMashine()
|
|
|
- self.field_height = float(conf["field"]["y"])
|
|
|
- self.field_width = float(conf["field"]["x"])
|
|
|
-
|
|
|
- self.success = False
|
|
|
- self.log_handler = logHandler.get_log_handler() # neu
|
|
|
- #pass
|
|
|
-
|
|
|
- self.n = 0
|
|
|
-
|
|
|
- def start(self):
|
|
|
- self.log_handler.log_and_print("start magnetic sensor")
|
|
|
- if not conn.isConnected():
|
|
|
- conn.open(port = self.conf["arduino"]["port"])
|
|
|
- conn.addRecvCallback(self._readCb)
|
|
|
- self.dummyActive = True
|
|
|
- dummyThread = threading.Thread(target=self._readCb_dummy)
|
|
|
- dummyThread.start()
|
|
|
-
|
|
|
- def _readCb(self, raw):
|
|
|
- print("mag: ", conn.getMagneticField())
|
|
|
-
|
|
|
- def _readCb_dummy(self):
|
|
|
- self.log_handler.log_and_print("magnetic sensor: generating test values")
|
|
|
- while self.dummyActive:
|
|
|
- if self.n % 4 < 1:
|
|
|
- dummyPosition = (0, self.n%1 * self.field_height)
|
|
|
- elif self.n % 4 < 2:
|
|
|
- dummyPosition = (self.n%1 * self.field_width, self.field_height)
|
|
|
- elif self.n % 4 < 3:
|
|
|
- dummyPosition = (self.field_width, self.field_height - self.n%1 * self.field_height)
|
|
|
- else:
|
|
|
- dummyPosition = (self.field_width - self.n%1 * self.field_width, 0)
|
|
|
-
|
|
|
- self.n += 0.01
|
|
|
-
|
|
|
-
|
|
|
- def calibrate(self, x, y):
|
|
|
- pass
|
|
|
-
|
|
|
- def read(self):
|
|
|
- return conn.getMagneticField()
|
|
|
-
|
|
|
- def stop(self): # neu
|
|
|
- self.log_handler.log_and_print("stop magnetic sensor")
|
|
|
- self.dummyActive = False
|
|
|
- conn.close()
|