aggregators.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from datetime import timedelta
  2. from dataclasses import dataclass, fields
  3. from .common import MatchSeries, ALLOWED_NAMES, ALLOWED_GLOBALS
  4. """
  5. This middleware aggregates fields over a specified timespan.
  6. """
  7. class Aggregate(MatchSeries):
  8. def __init__(self, parent, series, timespan, avg=[], sum=[], last=[], first=[], min=[], max=[]) -> None:
  9. super().__init__(series)
  10. self._timespan = timedelta(seconds=timespan)
  11. self._avg = avg
  12. self._sum = sum
  13. self._last = last
  14. self._first = first
  15. self._min = min
  16. self._max = max
  17. self._trigger_time = None
  18. self._datasets = []
  19. labels = {f: 'avg' for f in self._avg}
  20. labels.update({f: 'sum' for f in self._sum})
  21. labels.update({f: 'last' for f in self._last})
  22. labels.update({f: 'first' for f in self._first})
  23. labels.update({f: 'min' for f in self._min})
  24. labels.update({f: 'max' for f in self._max})
  25. self._name = f"{series} ({self._timespan.total_seconds()}s) {', '.join(f'{v}({k})' for k, v in labels.items())}"
  26. def execute(self, values):
  27. hasTriggered = False
  28. for measurement in values:
  29. dataset = self.get_series(measurement)
  30. if not dataset:
  31. continue
  32. self._last_measurement = measurement
  33. # set trigger time if not set
  34. if self._trigger_time is None:
  35. self._trigger_time = dataset.timestamp + self._timespan
  36. # check if we need to trigger
  37. if dataset.timestamp >= self._trigger_time and self._datasets:
  38. hasTriggered = True
  39. yield self.set_series(self._last_measurement, self.trigger())
  40. self._datasets.append(dataset)
  41. # trigger if we haven't received any new data or the trigger time has passed
  42. if (not hasTriggered and
  43. self._datasets and
  44. self._trigger_time and
  45. ((values and values[-1].timestamp >= self._trigger_time) or not values)):
  46. yield self.set_series(self._last_measurement, self.trigger())
  47. def apply_function(self, func, fields: list):
  48. last = self._datasets[-1]
  49. for field in fields:
  50. if isinstance(getattr(last, field), tuple):
  51. n = range(len(getattr(last, field)))
  52. yield (field, tuple(func(getattr(x, field)[i] for x in self._datasets) for i in n))
  53. else:
  54. yield (field, func(getattr(x, field) for x in self._datasets))
  55. def trigger(self):
  56. last = self._datasets[-1]
  57. field_dict = last.__dict__.copy()
  58. field_dict['series'] = self._name
  59. field_dict['source'] = 'aggregation'
  60. # apply aggregation functions
  61. field_dict.update(self.apply_function(lambda v: sum(v) / len(self._datasets), self._avg))
  62. field_dict.update(self.apply_function(sum, self._sum))
  63. field_dict.update((f, getattr(self._datasets[-1], f, 0)) for f in self._last)
  64. field_dict.update((f, getattr(self._datasets[0], f, 0)) for f in self._first)
  65. field_dict.update(self.apply_function(min, self._min))
  66. field_dict.update(self.apply_function(max, self._max))
  67. self._trigger_time = None
  68. self._datasets = []
  69. return type(last)(**field_dict)