allen_bradley_connect.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import logging
  2. from pylogix import PLC
  3. from datetime import datetime
  4. from structures.plant import *
  5. from structures.measurement import *
  6. from inputs.common import Input
  7. localtz = datetime.now().astimezone().tzinfo
  8. logger = logging.getLogger(__name__)
  9. class AllenBradleyCPU(Input):
  10. def __init__(self, host):
  11. super().__init__(self.read_handler)
  12. self.comm = PLC()
  13. self.comm.IPAddress = host
  14. self.interval = 0.02
  15. self.cpu_state_tags = {
  16. "name": "B1[0]",
  17. }
  18. self.measurement_tags = {
  19. "24V": "B200",
  20. "480V": "B201"
  21. }
  22. self.tags = list(self.cpu_state_tags.values()) + list(self.measurement_tags.values())
  23. def read_handler(self):
  24. timestamp = datetime.now(localtz)
  25. ret = self.comm.Read(self.tags)
  26. if ret[0].Status == "Success":
  27. cpu_values = {t: r.Value for t, r in zip(self.cpu_state_tags, ret)}
  28. self._q.put(CompactLogixState(timestamp, "AB", **cpu_values))
  29. offset = self.cpu_state_tags.values()
  30. ifm_values_count = 22
  31. data = [r.Value for r in ret[offset:offset+ifm_values_count]]
  32. channels = 16
  33. self._q.put(Measurement24v(timestamp, "AB",
  34. current = tuple([x / 10 for x in data[0:channels]]),
  35. status = tuple([data[16] & (1 << i) > 0 for i in range(channels)]),
  36. overload = tuple([data[17] & (1 << i) > 0 for i in range(channels)]),
  37. short_circuit = tuple([data[18] & (1 << i) > 0 for i in range(channels)]),
  38. limit = tuple([data[19] & (1 << i) > 0 for i in range(channels)]),
  39. pushbutton = tuple([data[20] & (1 << i) > 0 for i in range(channels)]),
  40. voltage = data[22] / 100
  41. ))
  42. offset += ifm_values_count
  43. data = [r.Value for r in ret[offset:offset+9]]
  44. self._q.put(Measurement480v(timestamp, "AB",
  45. voltage = tuple(data[0:3]),
  46. current = tuple(data[3:6]),
  47. phase = tuple(data[6:9])
  48. ))
  49. else:
  50. logger.error("CPU read: " + ret[0].Status)