replay_influxdb.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import logging, time
  2. from datetime import datetime, timedelta
  3. from influxdb_client import InfluxDBClient
  4. from inputs.common import Input
  5. from structures.measurement import Measurement24v, Measurement480v
  6. from structures.plant import CompactLogixState, S7State
  7. dataclasses = [
  8. Measurement24v,
  9. Measurement480v,
  10. S7State,
  11. CompactLogixState,
  12. ]
  13. logger = logging.getLogger(__name__)
  14. class Replay(Input):
  15. def __init__(self, url, token, org, bucket, start_time) -> None:
  16. super().__init__(self.read_handler)
  17. self.interval = 1.0
  18. self.client = InfluxDBClient(url, token, org=org)
  19. self.bucket = bucket
  20. self.query_api = self.client.query_api()
  21. self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S %z")
  22. self.time_offset = datetime.now().astimezone() - self.current_time
  23. def read_handler(self):
  24. start = self.current_time
  25. end = start + timedelta(seconds=1)
  26. for result in self.query(start, end):
  27. self._q.put(result)
  28. self.current_time = end
  29. def query(self, start, stop):
  30. query = f'from(bucket:"{self.bucket}")\
  31. |> range(start: {start.isoformat()}, stop: {stop.isoformat()})\
  32. |> yield(name: "m")'
  33. result = self.query_api.query(query=query)
  34. results = []
  35. fields = {}
  36. old_dataclass = None
  37. for table in result:
  38. if table.records:
  39. record = table.records[0]
  40. for cls in dataclasses:
  41. if record.get_measurement() == cls.series:
  42. dataclass = cls
  43. break
  44. if old_dataclass != dataclass:
  45. results.extend(self.populate_dataclasses(old_dataclass, fields))
  46. fields = {}
  47. old_dataclass = dataclass
  48. for record in table.records:
  49. if not record.get_time() in fields:
  50. fields[record.get_time()] = {}
  51. field = fields[record.get_time()]
  52. if 'channel' in record.values:
  53. field[record.get_field()] = field[record.get_field()] + (record.get_value(), ) if record.get_field() in field else (record.get_value(), )
  54. else:
  55. field[record.get_field()] = record.get_value()
  56. field['source'] = record['source']
  57. results.extend(self.populate_dataclasses(dataclass, fields))
  58. return results
  59. def populate_dataclasses(self, dataclass, fields):
  60. for time, values in fields.items():
  61. yield dataclass(time + self.time_offset, **values)