snap7_server.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import snap7
  2. import logging
  3. import struct
  4. import re
  5. from datetime import datetime, tzinfo
  6. from inputs.common import Input
  7. localtz = datetime.now().astimezone().tzinfo
  8. logger = logging.getLogger(__name__)
  9. class SiemensServer(Input):
  10. interval = 0.02
  11. time_offset = None
  12. def __init__(self, port = 102):
  13. super().__init__(self.read_handler)
  14. self.server = snap7.server.Server(True)
  15. size = 100
  16. self.DB1 = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
  17. self.DB2 = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
  18. self.server.register_area(snap7.types.srvAreaDB, 1, self.DB1)
  19. self.server.register_area(snap7.types.srvAreaDB, 2, self.DB2)
  20. self.server.start(port)
  21. def read_handler(self):
  22. event : snap7.types.SrvEvent
  23. while True:
  24. event = self.server.pick_event()
  25. if not event:
  26. break
  27. text = self.server.event_text(event)
  28. match = re.match("^(?P<datetime>\d+-\d+-\d+ \d+:\d+:\d+) \[(?P<host>[\w\.:]+)\] (?P<type>[\w ]+), Area : (?P<area>.+), Start : (?P<start>\d+), Size : (?P<size>\d+) --> (?P<response>.+)$", text)
  29. if not match:
  30. logger.warn(text)
  31. continue
  32. if match.group("type") != "Write request":
  33. logger.warn(text)
  34. continue
  35. if int(match.group("start")) + int(match.group("size")) <= 4:
  36. continue
  37. if match.group("area") == "DB1":
  38. raw = bytearray(self.DB1)
  39. timestamp = self.get_timestamp(raw[0:4])
  40. self.queue_ifm_from_bytes("S7", timestamp, raw[4:34])
  41. elif match.group("area") == "DB2":
  42. raw = bytearray(self.DB2)
  43. timestamp = self.get_timestamp(raw[0:4])
  44. self.queue_energy_meter_from_bytes("S7", timestamp, raw[4:40])
  45. def get_timestamp(self, raw):
  46. now = datetime.now(localtz)
  47. cpu_time = struct.unpack(">I", raw)[0] / 1000
  48. offset = now.timestamp() - cpu_time
  49. if self.time_offset:
  50. self.time_offset = self.time_offset * 0.999 + offset * 0.001
  51. else:
  52. self.time_offset = offset
  53. timestamp = datetime.fromtimestamp(self.time_offset + cpu_time, localtz)
  54. return timestamp