12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- import logging, time
- from datetime import datetime, timedelta
- from influxdb_client import InfluxDBClient
- from inputs.common import Input
- from structures.measurement import Measurement24v, Measurement480v
- from structures.plant import CompactLogixState, S7State
- dataclasses = [
- Measurement24v,
- Measurement480v,
- S7State,
- CompactLogixState,
- ]
- logger = logging.getLogger(__name__)
- class Replay(Input):
- def __init__(self, url, token, org, bucket, start_time) -> None:
- super().__init__(self.read_handler)
- self.interval = 1.0
- self.client = InfluxDBClient(url, token, org=org)
- self.bucket = bucket
- self.query_api = self.client.query_api()
- self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S %z")
- self.time_offset = datetime.now().astimezone() - self.current_time
-
- def read_handler(self):
- start = self.current_time
- end = start + timedelta(seconds=1)
- for result in self.query(start, end):
- self._q.put(result)
- self.current_time = end
- def query(self, start, stop):
- query = f'from(bucket:"{self.bucket}")\
- |> range(start: {start.isoformat()}, stop: {stop.isoformat()})\
- |> yield(name: "m")'
- result = self.query_api.query(query=query)
- results = []
- fields = {}
- old_dataclass = None
- for table in result:
-
- if table.records:
- record = table.records[0]
- for cls in dataclasses:
- if record.get_measurement() == cls.series:
- dataclass = cls
- break
-
- if old_dataclass != dataclass:
- results.extend(self.populate_dataclasses(old_dataclass, fields))
- fields = {}
- old_dataclass = dataclass
- for record in table.records:
- if not record.get_time() in fields:
- fields[record.get_time()] = {}
- field = fields[record.get_time()]
- if 'channel' in record.values:
- field[record.get_field()] = field[record.get_field()] + (record.get_value(), ) if record.get_field() in field else (record.get_value(), )
- else:
- field[record.get_field()] = record.get_value()
- field['source'] = record['source']
- results.extend(self.populate_dataclasses(dataclass, fields))
- return results
- def populate_dataclasses(self, dataclass, fields):
- for time, values in fields.items():
- yield dataclass(time + self.time_offset, **values)
|