common.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from threading import Thread
  2. from queue import Queue
  3. import time
  4. import struct
  5. import logging
  6. from structures.measurement import *
  7. logger = logging.getLogger(__name__)
  8. class Input:
  9. _t = None
  10. _stop = False
  11. _q = Queue()
  12. interval = 1.0
  13. _read_cb = None
  14. def __init__(self, read_cb) -> None:
  15. self._read_cb = read_cb
  16. def start(self):
  17. if not self._t:
  18. self._stop = False
  19. self._t = Thread(target = self._main)
  20. self._t.setDaemon(True)
  21. self._t.start()
  22. def stop(self):
  23. if self._t:
  24. self._stop = True
  25. self._t.join()
  26. self._t = None
  27. def read(self):
  28. while not self._q.empty():
  29. yield self._q.get()
  30. def _main(self):
  31. start_time = time.monotonic()
  32. while not self._stop:
  33. try:
  34. self._read_cb()
  35. except Exception as e:
  36. logger.exception(F"An exception occured while reading from {type(self)}!")
  37. time.sleep(1)
  38. end_time = time.monotonic()
  39. remaining = self.interval + start_time - end_time
  40. if remaining > 0:
  41. start_time += self.interval
  42. time.sleep(remaining)
  43. else:
  44. start_time = end_time
  45. def queue_ifm_from_bytes(self, source, timestamp, raw, channels = 16):
  46. data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw)
  47. current = tuple([x / 10 for x in data[0:channels]])
  48. status = tuple([data[16] & (1 << i) > 0 for i in range(channels)])
  49. overload = tuple([data[17] & (1 << i) > 0 for i in range(channels)])
  50. short_circuit = tuple([data[18] & (1 << i) > 0 for i in range(channels)])
  51. limit = tuple([data[19] & (1 << i) > 0 for i in range(channels)])
  52. pushbutton = tuple([data[20] & (1 << i) > 0 for i in range(channels)])
  53. voltage = data[22] / 100
  54. self._q.put(Measurement24v(timestamp, source, current, status, overload, short_circuit, limit, pushbutton, voltage))
  55. def queue_energy_meter_from_bytes(self, source, timestamp, raw):
  56. data = struct.unpack(">" + "f" * 9, raw)
  57. voltage = tuple(data[0:3])
  58. current = tuple(data[3:6])
  59. phase = tuple(data[6:9])
  60. self._q.put(Measurement480v(timestamp, source, voltage, current, phase))