12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from datetime import timedelta
- from dataclasses import dataclass, fields
- from .common import MatchSeries, ALLOWED_NAMES, ALLOWED_GLOBALS
- """
- This middleware aggregates fields over a specified timespan.
- """
- class Aggregate(MatchSeries):
- def __init__(self, parent, series, timespan, avg=[], sum=[], last=[], first=[], min=[], max=[]) -> None:
- super().__init__(series)
- self._timespan = timedelta(seconds=timespan)
- self._avg = avg
- self._sum = sum
- self._last = last
- self._first = first
- self._min = min
- self._max = max
- self._trigger_time = None
- self._datasets = []
- labels = {f: 'avg' for f in self._avg}
- labels.update({f: 'sum' for f in self._sum})
- labels.update({f: 'last' for f in self._last})
- labels.update({f: 'first' for f in self._first})
- labels.update({f: 'min' for f in self._min})
- labels.update({f: 'max' for f in self._max})
- self._name = f"{series} ({self._timespan.total_seconds()}s) {', '.join(f'{v}({k})' for k, v in labels.items())}"
- def execute(self, values):
- hasTriggered = False
- for measurement in values:
- dataset = self.get_series(measurement)
- if not dataset:
- continue
- self._last_measurement = measurement
-
- # set trigger time if not set
- if self._trigger_time is None:
- self._trigger_time = dataset.timestamp + self._timespan
-
- # check if we need to trigger
- if dataset.timestamp >= self._trigger_time and self._datasets:
- hasTriggered = True
- yield self.set_series(self._last_measurement, self.trigger())
- self._datasets.append(dataset)
- # trigger if we haven't received any new data or the trigger time has passed
- if (not hasTriggered and
- self._datasets and
- self._trigger_time and
- ((values and values[-1].timestamp >= self._trigger_time) or not values)):
- yield self.set_series(self._last_measurement, self.trigger())
- def apply_function(self, func, fields: list):
- last = self._datasets[-1]
- for field in fields:
- if isinstance(getattr(last, field), tuple):
- n = range(len(getattr(last, field)))
- yield (field, tuple(func(getattr(x, field)[i] for x in self._datasets) for i in n))
- else:
- yield (field, func(getattr(x, field) for x in self._datasets))
- def trigger(self):
- last = self._datasets[-1]
- field_dict = last.__dict__.copy()
- field_dict['series'] = self._name
- field_dict['source'] = 'aggregation'
- # apply aggregation functions
- field_dict.update(self.apply_function(lambda v: sum(v) / len(self._datasets), self._avg))
- field_dict.update(self.apply_function(sum, self._sum))
- field_dict.update((f, getattr(self._datasets[-1], f, 0)) for f in self._last)
- field_dict.update((f, getattr(self._datasets[0], f, 0)) for f in self._first)
- field_dict.update(self.apply_function(min, self._min))
- field_dict.update(self.apply_function(max, self._max))
- self._trigger_time = None
- self._datasets = []
- return type(last)(**field_dict)
-
|