influxdb.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import logging
  2. from influxdb_client import InfluxDBClient, Point
  3. from influxdb_client.client.write_api import SYNCHRONOUS
  4. import dataclasses
  5. logger = logging.getLogger(__name__)
  6. class InfluxDB:
  7. def __init__(self, url, token, org, bucket):
  8. self.client = InfluxDBClient(url, token, org=org)
  9. self.bucket = bucket
  10. self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
  11. self.query_api = self.client.query_api()
  12. def write(self, values):
  13. points = []
  14. for meas in values:
  15. p = Point(meas.series).time(meas.timestamp).tag("source", meas.source)
  16. for field in dataclasses.fields(meas):
  17. if not field.name in ["timestamp", "series", "source"]:
  18. value = getattr(meas, field.name)
  19. if type(value) is bool:
  20. p.field(field.name, int(value))
  21. elif not type(value) is tuple:
  22. p.field(field.name, value)
  23. else:
  24. for i, v in enumerate(value):
  25. pt = Point(meas.series).time(meas.timestamp).tag("source", meas.source).tag("channel", i)
  26. if type(v) is bool:
  27. pt.field(F"{field.name}", int(v))
  28. else:
  29. pt.field(F"{field.name}", v)
  30. points.append(pt)
  31. points.append(p)
  32. try:
  33. self.write_api.write(bucket=self.bucket, record=points)
  34. except Exception as ex:
  35. logger.exception("Influx DB write failed")