jm-hsa 1 year ago
commit
51f8668414

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+# python artifacts
+*.pyc
+*.pyo

+ 14 - 0
Dockerfile

@@ -0,0 +1,14 @@
+# syntax=docker/dockerfile:1
+FROM python:3-slim
+ENV PYTHONDONTWRITEBYTECODE=1
+ENV PYTHONUNBUFFERED=1
+WORKDIR /app
+
+COPY ./app/requirements.txt /app/requirements.txt
+
+RUN pip install -r requirements.txt
+COPY ./app/ /app/
+
+EXPOSE 102
+
+CMD ["python3", "main.py"]

+ 172 - 0
README.md

@@ -0,0 +1,172 @@
+# Software des Energiemonitoring-Systems
+
+## Installation mit Docker
+
+Voraussetzungen:
+- Linux-Umgebung
+- Git
+- Docker
+- docker-compose
+
+
+```bash
+git clone https://github.com/jm-hsa/plc-connector.git
+cd plc-connector
+sudo docker-compose build
+sudo docker-compose up -d
+```
+
+## Installation ohne Docker
+
+Voraussetzungen:
+- Git
+- Python 3.9
+- Python-Paketmanager `pip`
+- optional Admin-Rechte (für den `snap7`-Server auf Port 102)
+- `influxdb`-Server
+
+
+```cmd
+git clone https://github.com/jm-hsa/plc-connector.git
+cd plc-connector
+python -m pip install -r requirements.txt
+python main.py -c config.yml
+```
+
+## Einrichtung des Influxdb-Servers
+
+Der `influxdb`-Server lässt sich unter http://localhost:8086 verwalten. Dort legt man einen Benutzer an und erzeugt anschließend Tokens für die Anwendungen, die auf die Datenbank zugreifen. Über die Explore-Ansicht kann man grafisch individuelle Queries erstellen und dann zu einem Dashboard in z. B. Grafana zusammenstellen.
+
+## Konfiguration
+
+Die PLC-Connector Anwendung wird über die Datei `config.yml` konfiguriert.
+
+Dabei kann eine Auswahl von Modulen aktiviert werden, welche wiederum in drei Kategorien unterteilt sind:
+
+```yaml
+Inputs:
+  # Eingangsmodule zum Lesen von Messwerten und Anlagenzuständen
+  
+  # Definiton des ersten Moduls
+  #   ClassName: Name der Python-Klasse des Moduls
+  #   path.to.module: relativer Importpfad des Input-Moduls
+  - ClassName: path.to.module
+    # Ob das Modul geladen werden soll
+    # Default: True
+    enabled: True
+    # Parameter, die an den Konstruktor des Moduls übergeben werden
+    param1: "value 1"
+    param2: "value 2"
+
+  # Definition weiterer Module
+  - ClassName2: path.to.module2
+    enabled: True
+
+Middlewares:
+  # Zwischenmodule zum Verarbeiten von Messwerten und Anlagenzuständen
+  
+  # Definiton des ersten Moduls
+  #   ClassName: Name der Python-Klasse des Moduls
+  #   path.to.module: relativer Importpfad des Middleware-Moduls
+  - ClassName: path.to.module
+    enabled: False
+
+  - TimeCorrelation: time_correlation
+    # Zwischenmodule können geschachtelt werden, so dass sie die Ergebnisse des überliegenden Moduls weiterverarbeiten
+    submodules:
+    - PrintStats: print_stats
+      # Standardmäßig werden die Ergebnisse von Middleware-Modulen ohne Untermodulen für die Ausgabe gesammelt und dedupliziert 
+
+Outputs:
+  # Ausgabemodule zum Schreiben von Ergebnisse in eine beliebige Anzahl von Datenbanken  
+  # Definiton des ersten Moduls
+  #   ClassName: Name der Python-Klasse des Moduls
+  #   path.to.module: relativer Importpfad des Output-Moduls
+  - CSVStorage: csv_file
+    path: logs
+
+  - InfluxDB: influxdb
+    url: "http://localhost:8086"
+    token: "<token>"
+    org: "myorg"
+    bucket: "energy-monitor"
+```
+
+## Input-Module
+
+- `SiemensCPU: siemens.snap7_connect` verbindet sich zu einer Siemens-Steuerung und fragt aktiv einen Datenbaustein ab
+  ```yaml
+  host: "192.168.0.10" # Addresse der CPU 
+  ```
+- `SiemensServer: siemens.snap7_server` emuliert eine S7-PLC und stellt passiv Datenbausteine zum Beschreiben bereit
+  ```yaml
+  port: 102 # Port des S7-Servers 
+  ```
+- `Balluff: balluff.balluff_html` fragt periodisch einen Balluff IO-Link-Master nach Prozessdaten ab. Sollte wegen schlechter Performance nicht verwendet werden!
+
+- `AllenBradleyCPU: rockwell.allen_bradley_connect` stellt eine EtherNet/IP Verbindung zu einer Compact Logix Steuerung her und fragt aktiv eine Liste von Tags ab
+  ```yaml
+  host: "192.168.1.10" # Addresse der CPU
+  ```
+
+- `Replay: replay_influxdb` gibt aufgezeichnete Messwerte aus einer Influxdb-Datenbank wieder. Dadurch können Messversuche ohne Hardware wiederholt werden
+  ```yaml
+  # Influxdb Parameter
+  url: "http://localhost:8086"
+  token: "<token>"
+  org: "myorg"
+  bucket: "energy-monitor"
+
+  # Zeitstempel, an dem mit der Wiederholung begonnen wird
+  start_time: 01.01.2000 00:00:00
+  ```
+
+## Middleware-Module
+
+- `PrintStats: print_stats` zeigt Inputstatistiken in der Konsole an. Warnt, wenn ein Input keine Daten liefert.
+
+- `TimeCorrelation: time_correlation` kombiniert mehrere Datenströme zu einem zeitlich monotonen Datensatz. Beispiel:
+  ```json
+  [
+    {"timestamp": "00:00:00", "series": "24V", "value": 123}, 
+    {"timestamp": "00:00:02", "series": "24V", "value": 789}
+    {"timestamp": "00:00:01", "series": "480V", "value": 456}, 
+  ]
+  ```
+  wird korreliert zu:
+  ```json
+  [
+    {
+      "timestamp": "00:00:00",
+      "24V": {"value": 123}
+    }, 
+    {
+      "timestamp": "00:00:01", 
+      "24V": {"value": 123},
+      "480V": {"value": 456}
+    }, 
+    {
+      "timestamp": "00:00:02", 
+      "24V": {"value": 789}, 
+      "480V": {"value": 456}
+    }
+  ]
+  ```
+
+## Output-Module
+
+Ergebnisse der Middleware-Module werden anhand des `series` Feldes in separaten Messreihen gespeichert. Wenn kein `series` Feld vorhanden ist (z.B. nach `time_correlation`), dann werden alle übrigen Felder als eigenständige Messreihen angesehen. Zu jedem Zeitpunkt kann immer nur ein Messwert (eine CSV-Zeile / ein Influx-Record) je `series` existieren.
+
+- `CSVStorage: csv_file` speichert die Ergebnisse in CSV-Dateien und fügt diese periodisch zu einem ZIP-Ordner hinzu
+  ```yaml
+  path: logs # Ordner, in dem die Dateien gespeichert werden 
+  ```
+
+- `InfluxDB: influxdb` speichert die Ergebnisse in einen Influxdb-bucket.
+  ```yaml
+  # Influxdb Parameter
+  url: "http://localhost:8086"
+  token: "<token>"
+  org: "myorg"
+  bucket: "energy-monitor"
+  ```

+ 11 - 0
app/inputs/balluff/balluff_ethernet_ip.py

@@ -0,0 +1,11 @@
+from cpppo.server.enip import client
+from cpppo.server.enip.getattr import attribute_operations
+
+HOST = "192.168.1.100"
+TAGS = ["@4/100/3"]
+
+with client.connector(host=HOST) as conn:
+    for index, descr, op, reply, status, value in conn.synchronous(
+        operations=attribute_operations(
+            TAGS, route_path=[], send_path='' )):
+        print(": %20s: %s" % (descr, value))

+ 36 - 0
app/inputs/balluff/balluff_html.py

@@ -0,0 +1,36 @@
+from datetime import datetime
+import time
+import requests, json
+import re
+
+from inputs.common import Input
+
+class Balluff(Input):
+
+  cpu_start_time = None
+  cpu_last_time = None
+  local_start_time = time.time()
+  db = 1
+  interval = 0.05
+  url = "http://192.168.10.20/ports.jsn"
+  port = 0
+
+  def __init__(self):
+    super().__init__(self.read_handler)
+
+  def read_handler(self):
+    try:
+      req = requests.get(self.url)
+    except requests.exceptions.ConnectionError:
+      return
+
+    timestamp = datetime.utcnow()
+    response = json.loads(req.text)
+    if not re.match("^DF210[01]$", response['ports'][self.port]['productId']):
+      raise Exception("unsupported device " + response['ports'][self.port]['productId'])
+
+    data = response['ports'][self.port]['processInputs'].split(" ")
+    data = bytes([int(x, 16) for x in data])
+    
+    self.queue_ifm_from_bytes(timestamp, data)
+      

+ 72 - 0
app/inputs/common.py

@@ -0,0 +1,72 @@
+from threading import Thread
+from queue import Queue
+import time
+import struct
+import logging
+
+from structures.measurement import *
+
+logger = logging.getLogger(__name__)
+
+class Input:
+  _t = None
+  _stop = False
+  _q = Queue()
+
+  interval = 1.0
+  _read_cb = None
+
+  def __init__(self, read_cb) -> None:
+      self._read_cb = read_cb
+
+  def start(self):
+    if not self._t:
+      self._stop = False
+      self._t = Thread(target = self._main)
+      self._t.setDaemon(True)
+      self._t.start()
+
+  def stop(self):
+    if self._t:
+      self._stop = True
+      self._t.join()
+      self._t = None
+
+  def read(self):
+    while not self._q.empty():
+      yield self._q.get()
+
+  def _main(self):
+    start_time = time.monotonic()
+    while not self._stop:
+      try:
+        self._read_cb()
+      except Exception as e:
+        logger.exception(F"An exception occured while reading from {type(self)}!")
+        time.sleep(1)
+
+      end_time = time.monotonic()
+      remaining = self.interval + start_time - end_time
+      if remaining > 0:
+        start_time += self.interval
+        time.sleep(remaining)
+      else:
+        start_time = end_time
+        
+  def queue_ifm_from_bytes(self, source, timestamp, raw, channels = 16):
+    data = struct.unpack(">" + "B" * 16 + "HHHHHBxH", raw)
+    current = tuple([x / 10 for x in data[0:channels]])
+    status = tuple([data[16] & (1 << i) > 0 for i in range(channels)])
+    overload = tuple([data[17] & (1 << i) > 0 for i in range(channels)])
+    short_circuit = tuple([data[18] & (1 << i) > 0 for i in range(channels)])
+    limit = tuple([data[19] & (1 << i) > 0 for i in range(channels)])
+    pushbutton = tuple([data[20] & (1 << i) > 0 for i in range(channels)])
+    voltage = data[22] / 100
+    self._q.put(Measurement24v(timestamp, source, current, status, overload, short_circuit, limit, pushbutton, voltage))
+
+  def queue_energy_meter_from_bytes(self, source, timestamp, raw):
+    data = struct.unpack(">" + "f" * 9, raw)
+    voltage = tuple(data[0:3])
+    current = tuple(data[3:6])
+    phase = tuple(data[6:9])
+    self._q.put(Measurement480v(timestamp, source, voltage, current, phase))

+ 68 - 0
app/inputs/dummy.py

@@ -0,0 +1,68 @@
+import logging
+import random
+import math
+from datetime import datetime, timedelta, time
+from inputs.common import Input as Inp
+
+from structures.measurement import Measurement24v, Measurement480v
+from structures.plant import S7State, CompactLogixState
+
+logger = logging.getLogger(__name__)
+localtz = datetime.now().astimezone().tzinfo
+
+def f():
+  return random.random()
+
+def b():
+  return random.choice([True, False])
+
+def i(count=100):
+  return random.randint(0, count-1)
+
+class Input(Inp):
+  def __init__(self, message) -> None:
+    super().__init__(self.read_handler)
+    logger.debug(message)
+    self.interval = 0.01
+  
+  def read_handler(self):
+    current = datetime.now()
+    current_td = timedelta(
+      hours = current.hour, 
+      minutes = current.minute, 
+      seconds = current.second, 
+      microseconds = current.microsecond)
+
+    # discretize to specified resolution
+    to_sec = timedelta(seconds = round(current_td.total_seconds(), max(0, int(-math.log10(self.interval)))))
+    timestamp = (datetime.combine(current, time(0)) + to_sec).astimezone(localtz)
+
+    self._q.put(Measurement24v(
+      timestamp - timedelta(seconds=0.05), 
+      "dummy24v", 
+      (f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f()),
+      (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
+      (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
+      (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
+      (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
+      (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
+      f() + 23.5
+    ))
+    self._q.put(Measurement480v(
+      timestamp - timedelta(seconds=0.03),
+      "dummy480v",
+      (f()+230, f()+230, f()+230),
+      (f(), f(), f()),
+      (i(360), i(360), i(360))
+    ))
+    self._q.put(CompactLogixState(
+      timestamp,
+      "dummyAB",
+      i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
+      i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
+      i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
+      i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
+      i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
+      i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), 
+      i(2), i(2)
+    ))

+ 73 - 0
app/inputs/replay_influxdb.py

@@ -0,0 +1,73 @@
+import logging, time
+from datetime import datetime, timedelta
+from influxdb_client import InfluxDBClient
+
+from inputs.common import Input
+from structures.measurement import Measurement24v, Measurement480v
+from structures.plant import CompactLogixState, S7State
+
+dataclasses = [
+  Measurement24v,
+  Measurement480v,
+  S7State,
+  CompactLogixState,
+]
+
+logger = logging.getLogger(__name__)
+
+class Replay(Input):
+  def __init__(self, url, token, org, bucket, start_time) -> None:
+    super().__init__(self.read_handler)
+    self.interval = 1.0
+    self.client = InfluxDBClient(url, token, org=org)
+    self.bucket = bucket
+
+    self.query_api = self.client.query_api()
+    self.current_time = datetime.strptime(start_time, "%d.%m.%Y %H:%M:%S %z")
+    self.time_offset = datetime.now().astimezone() - self.current_time
+  
+  def read_handler(self):
+    start = self.current_time
+    end = start + timedelta(seconds=1)
+    for result in self.query(start, end):
+      self._q.put(result)
+    self.current_time = end
+
+  def query(self, start, stop):
+    query = f'from(bucket:"{self.bucket}")\
+      |> range(start: {start.isoformat()}, stop: {stop.isoformat()})\
+      |> yield(name: "m")'
+    result = self.query_api.query(query=query)
+    results = []
+    fields = {}
+    old_dataclass = None
+    for table in result:
+      
+      if table.records:
+        record = table.records[0]
+        for cls in dataclasses:
+          if record.get_measurement() == cls.series:
+            dataclass = cls
+            break
+      
+      if old_dataclass != dataclass:
+        results.extend(self.populate_dataclasses(old_dataclass, fields))
+        fields = {}
+        old_dataclass = dataclass
+
+      for record in table.records:
+        if not record.get_time() in fields:
+          fields[record.get_time()] = {}
+        field = fields[record.get_time()]
+        if 'channel' in record.values:
+          field[record.get_field()] = field[record.get_field()] + (record.get_value(), ) if record.get_field() in field else (record.get_value(), )
+        else:
+          field[record.get_field()] = record.get_value()
+        field['source'] = record['source']
+
+    results.extend(self.populate_dataclasses(dataclass, fields))
+    return results
+
+  def populate_dataclasses(self, dataclass, fields):
+    for time, values in fields.items():
+      yield dataclass(time + self.time_offset, **values)

+ 59 - 0
app/inputs/rockwell/allen_bradley_connect.py

@@ -0,0 +1,59 @@
+import logging
+
+from pylogix import PLC
+from datetime import datetime
+
+from structures.plant import *
+from structures.measurement import *
+from inputs.common import Input
+
+localtz = datetime.now().astimezone().tzinfo
+logger = logging.getLogger(__name__)
+
+class AllenBradleyCPU(Input):
+ 
+  def __init__(self, host):
+    super().__init__(self.read_handler)
+    self.comm = PLC()
+    self.comm.IPAddress = host
+    self.interval = 0.02
+    self.cpu_state_tags = {
+      "name": "B1[0]",
+      
+    }
+    self.measurement_tags = {
+      "24V": "B200",
+      "480V": "B201"
+    }
+    self.tags = list(self.cpu_state_tags.values()) + list(self.measurement_tags.values())
+    
+  def read_handler(self):
+    timestamp = datetime.now(localtz)
+    ret = self.comm.Read(self.tags)
+    if ret[0].Status == "Success":
+      cpu_values = {t: r.Value for t, r in zip(self.cpu_state_tags, ret)}
+      self._q.put(CompactLogixState(timestamp, "AB", **cpu_values))
+      
+      offset = self.cpu_state_tags.values()
+      ifm_values_count = 22
+      data = [r.Value for r in ret[offset:offset+ifm_values_count]]
+      channels = 16
+      self._q.put(Measurement24v(timestamp, "AB", 
+        current = tuple([x / 10 for x in data[0:channels]]),
+        status = tuple([data[16] & (1 << i) > 0 for i in range(channels)]),
+        overload = tuple([data[17] & (1 << i) > 0 for i in range(channels)]),
+        short_circuit = tuple([data[18] & (1 << i) > 0 for i in range(channels)]),
+        limit = tuple([data[19] & (1 << i) > 0 for i in range(channels)]),
+        pushbutton = tuple([data[20] & (1 << i) > 0 for i in range(channels)]),
+        voltage = data[22] / 100
+      ))
+      offset += ifm_values_count
+      data = [r.Value for r in ret[offset:offset+9]]
+      self._q.put(Measurement480v(timestamp, "AB", 
+        voltage = tuple(data[0:3]),
+        current = tuple(data[3:6]),
+        phase = tuple(data[6:9])
+      ))
+
+    else:
+      logger.error("CPU read: " + ret[0].Status)

+ 67 - 0
app/inputs/siemens/snap7_connect.py

@@ -0,0 +1,67 @@
+import logging
+
+from datetime import datetime
+import struct
+
+from snap7.client import Client
+from snap7.exceptions import Snap7Exception
+from snap7.types import Areas
+
+from structures.plant import *
+from inputs.common import Input
+
+localtz = datetime.now().astimezone().tzinfo
+logger = logging.getLogger(__name__)
+
+class SiemensCPU(Input):
+
+  interval = 0.05
+
+  def __init__(self, address) -> None:
+    super().__init__(self.read_handler)
+    self.address = address
+    self.cpu = Client()
+
+  def start(self):
+    try:
+      self.cpu.connect(self.address, rack=0, slot=0)
+    except Snap7Exception as ex:
+      logger.exception(ex)
+    super().start()
+
+  def read_handler(self):
+    timestamp = datetime.now(localtz)
+    cpu_state = self.cpu.get_cpu_state() == "S7CpuStatusRun"
+
+    if self.cpu.get_connected():
+      try:
+        status = self.cpu.read_area(area=Areas.DB, dbnumber=3, start=0, size=5)
+        #inputs = self.cpu.read_area(area=Areas.PE, dbnumber=0, start=32, size=112-32)
+      except Snap7Exception as ex:
+        if "TCP" in str(ex):
+          self.cpu.disconnect()
+          return
+        else:
+          raise ex
+    else:
+      self.cpu.disconnect()
+      self.cpu.connect(self.address, rack=0, slot=0)
+      return
+
+    hydraulics_powered = status[0] & 1 > 0
+
+    data = struct.unpack(">BBBBB", status)
+    #print(''.join(["{:02X}".format(x) for x in inputs]))
+
+    # TODO: Parse data
+
+    self._q.put(PlantState(timestamp, "S7", cpu_state, *data))
+
+  def get_timestamp(self, cpu_time):
+    if not self.cpu_start_time:
+      self.synchronize()
+    cpu_diff = cpu_time - self.cpu_start_time
+    date = datetime.fromtimestamp(self.local_start_time + cpu_diff, localtz)
+    return date
+
+      

+ 65 - 0
app/inputs/siemens/snap7_server.py

@@ -0,0 +1,65 @@
+import snap7
+import logging
+import struct
+import re
+from datetime import datetime, tzinfo
+
+from inputs.common import Input
+
+localtz = datetime.now().astimezone().tzinfo
+logger = logging.getLogger(__name__)
+
+class SiemensServer(Input):
+  interval = 0.02
+
+  time_offset = None
+
+  def __init__(self, port = 102):
+    super().__init__(self.read_handler)
+    self.server = snap7.server.Server(True)
+    size = 100
+    self.DB1 = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
+    self.DB2 = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
+    self.server.register_area(snap7.types.srvAreaDB, 1, self.DB1)
+    self.server.register_area(snap7.types.srvAreaDB, 2, self.DB2)
+    self.server.start(port)
+
+  def read_handler(self):
+    event : snap7.types.SrvEvent
+    while True:
+      event = self.server.pick_event()
+      if not event:
+        break
+      text = self.server.event_text(event)
+      match = re.match("^(?P<datetime>\d+-\d+-\d+ \d+:\d+:\d+) \[(?P<host>[\w\.:]+)\] (?P<type>[\w ]+), Area : (?P<area>.+), Start : (?P<start>\d+), Size : (?P<size>\d+) --> (?P<response>.+)$", text)
+      if not match:
+        logger.warn(text)
+        continue
+      
+      if match.group("type") != "Write request":
+        logger.warn(text)
+        continue
+      
+      if int(match.group("start")) + int(match.group("size")) <= 4:
+        continue
+
+      if match.group("area") == "DB1":
+        raw = bytearray(self.DB1)
+        timestamp = self.get_timestamp(raw[0:4])
+        self.queue_ifm_from_bytes("S7", timestamp, raw[4:34])
+      elif match.group("area") == "DB2":
+        raw = bytearray(self.DB2)
+        timestamp = self.get_timestamp(raw[0:4])
+        self.queue_energy_meter_from_bytes("S7", timestamp, raw[4:40])
+
+  def get_timestamp(self, raw):
+    now = datetime.now(localtz)
+    cpu_time = struct.unpack(">I", raw)[0] / 1000
+    offset = now.timestamp() - cpu_time
+    if self.time_offset:
+      self.time_offset = self.time_offset * 0.999 + offset * 0.001
+    else:
+      self.time_offset = offset
+    
+    timestamp = datetime.fromtimestamp(self.time_offset + cpu_time, localtz)
+    return timestamp

+ 101 - 0
app/main.py

@@ -0,0 +1,101 @@
+#!/usr/bin/env python3
+
+import time
+import yaml
+import argparse
+import logging
+from logging.config import dictConfig
+from importlib import import_module
+
+# get config file from arguments
+
+parser = argparse.ArgumentParser(description='PLC Connector')
+parser.add_argument('-c', '--config', type=str, default='config.yml', help='config file')
+args = parser.parse_args()
+
+# read config
+config = yaml.safe_load(open(args.config, 'r'))
+
+dictConfig(config['Logging'])
+logger = logging.getLogger(__name__)
+
+def createModules(configItems, type):
+  for item in configItems:
+    cls = next(iter(item))
+    module = import_module(f"{type}s.{item[cls]}")
+    if item.get('enabled') == False:
+      continue
+    params = item.copy()
+    params.pop(cls, None)
+    params.pop('enabled', None)
+    params.pop('submodules', None)
+    params.pop('enable_output', None)
+    try:
+      yield getattr(module, cls)(**params)
+    except Exception as ex:
+      logger.fatal(F"{type} {cls} couldn't be initialized.", exc_info=False)
+      raise
+
+# setup input modules
+inputs = list(createModules(config['Inputs'], "input"))
+
+# setup middlewares recursively
+def createMiddlewares(configItems, parent = None):
+  items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
+  middlewares = list(createModules(items, "middleware"))
+  for (item, middleware) in zip(items, middlewares):
+    if 'submodules' in item:
+      middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
+      middleware.enable_output = item.get('enable_output', False)
+    else:
+      middleware.enable_output = item.get('enable_output', True)
+
+  return middlewares
+
+middlewares = createMiddlewares(config['Middlewares'])
+
+# setup output modules
+outputs = list(createModules(config['Outputs'], "output"))
+
+
+for source in inputs:
+  source.start()
+
+logger.debug("started sources")
+
+def executeMiddleware(middleware, values):
+  submodules = getattr(middleware, 'submodules', [])
+  result = list(middleware.execute(values))
+  if not submodules and middleware.enable_output:
+    return result
+  else:
+    subResults = []
+    for submodule in submodules:
+      subResults += executeMiddleware(submodule, result)
+    return subResults
+
+while True:
+  values = set()
+  for input in inputs:
+    values.update(input.read())
+
+  # sort the set by timestamp and series
+  values = sorted(values, key=lambda x: (x.timestamp, x.series))
+
+  # execute middlewares recursively and collect results of leaf modules
+
+  results = set()
+  for middleware in middlewares:
+    tmp = executeMiddleware(middleware, values)
+    if tmp:
+      results.update(tmp)
+  if not middlewares:
+    results = values
+
+  # sort the set by timestamp and series
+  results = sorted(results, key=lambda x: (x.timestamp, x.series))
+
+  for output in outputs:
+    output.write(results)
+    
+  time.sleep(1.9)

+ 85 - 0
app/middlewares/aggregators.py

@@ -0,0 +1,85 @@
+
+from datetime import timedelta
+from dataclasses import dataclass, fields
+from .common import MatchSeries, ALLOWED_NAMES, ALLOWED_GLOBALS
+
+"""
+This middleware aggregates fields over a specified timespan.
+"""
+class Aggregate(MatchSeries):
+  def __init__(self, parent, series, timespan, avg=[], sum=[], last=[], first=[], min=[], max=[]) -> None:
+    super().__init__(series)
+    self._timespan = timedelta(seconds=timespan)
+    self._avg = avg
+    self._sum = sum
+    self._last = last
+    self._first = first
+    self._min = min
+    self._max = max
+    self._trigger_time = None
+    self._datasets = []
+
+    labels = {f: 'avg' for f in self._avg}
+    labels.update({f: 'sum' for f in self._sum})
+    labels.update({f: 'last' for f in self._last})
+    labels.update({f: 'first' for f in self._first})
+    labels.update({f: 'min' for f in self._min})
+    labels.update({f: 'max' for f in self._max})
+
+    self._name = f"{series} ({self._timespan.total_seconds()}s) {', '.join(f'{v}({k})' for k, v in labels.items())}"
+
+  def execute(self, values):
+    hasTriggered = False
+    for measurement in values:
+      dataset = self.get_series(measurement)
+      if not dataset:
+        continue
+
+      self._last_measurement = measurement
+      
+      # set trigger time if not set
+      if self._trigger_time is None:
+        self._trigger_time = dataset.timestamp + self._timespan
+      
+      # check if we need to trigger
+      if dataset.timestamp >= self._trigger_time and self._datasets:
+        hasTriggered = True
+        yield self.set_series(self._last_measurement, self.trigger())
+
+      self._datasets.append(dataset)
+    # trigger if we haven't received any new data or the trigger time has passed
+    if (not hasTriggered and 
+        self._datasets and 
+        self._trigger_time and
+        ((values and values[-1].timestamp >= self._trigger_time) or not values)):
+      yield self.set_series(self._last_measurement, self.trigger())
+
+
+  def apply_function(self, func, fields: list):
+    last = self._datasets[-1]
+    for field in fields:
+      if isinstance(getattr(last, field), tuple):
+        n = range(len(getattr(last, field)))
+        yield (field, tuple(func(getattr(x, field)[i] for x in self._datasets) for i in n))
+      else:
+        yield (field, func(getattr(x, field) for x in self._datasets))
+
+  def trigger(self):
+    last = self._datasets[-1]
+    field_dict = last.__dict__.copy()
+    field_dict['series'] = self._name
+    field_dict['source'] = 'aggregation'
+
+    # apply aggregation functions
+    field_dict.update(self.apply_function(lambda v: sum(v) / len(self._datasets), self._avg))
+    field_dict.update(self.apply_function(sum, self._sum))
+    field_dict.update((f, getattr(self._datasets[-1], f, 0)) for f in self._last)
+    field_dict.update((f, getattr(self._datasets[0], f, 0)) for f in self._first)
+    field_dict.update(self.apply_function(min, self._min))
+    field_dict.update(self.apply_function(max, self._max))
+
+    self._trigger_time = None
+    self._datasets = []
+    return type(last)(**field_dict)
+    
+

+ 47 - 0
app/middlewares/common.py

@@ -0,0 +1,47 @@
+from dataclasses import fields
+
+from structures.common import BaseMeasurement
+from structures.measurement import Measurement24v, Measurement480v
+from structures.plant import S7State, CompactLogixState
+from structures.correlated import CorrelatedMeasurements
+
+class MatchSeries:
+  def __init__(self, series) -> None:
+    self._series = series
+  
+  def get_series(self, measurement: BaseMeasurement):
+    if measurement.series == self._series:
+      return measurement
+    else:
+      # find the series in the data
+      for key, value in measurement.__dict__.items():
+        if isinstance(value, BaseMeasurement) and value.series == self._series:
+          return value
+
+  def set_series(self, measurement: BaseMeasurement, series: BaseMeasurement):
+    if measurement.series == self._series:
+      return series
+    else:
+      # find the series in the data
+      for key, value in measurement.__dict__.items():
+        if isinstance(value, BaseMeasurement) and value.series == self._series:
+          return type(measurement)(**{**measurement.__dict__, key: series})
+
+ALLOWED_GLOBALS = {
+  'sum': sum,
+  'min': min,
+  'max': max,
+  'avg': lambda x: sum(x) / len(x),
+  'count': len,
+  'last': lambda x: x[-1],
+}
+
+ALLOWED_NAMES = \
+  [x.name for x in fields(Measurement24v)] + \
+  [x.name for x in fields(Measurement480v)] + \
+  [x.name for x in fields(CompactLogixState)] + \
+  [x.name for x in fields(S7State)] + \
+  [x.name for x in fields(CorrelatedMeasurements)] + \
+  list(ALLOWED_GLOBALS.keys())
+
+ALLOWED_NAMES = set([name for name in ALLOWED_NAMES if not name.startswith('_')])

+ 44 - 0
app/middlewares/debug.py

@@ -0,0 +1,44 @@
+import logging
+import time
+
+logger = logging.getLogger(__name__)
+
+class PrintStats:
+  def __init__(self, parent):
+    self.startTime = time.monotonic()
+
+  def execute(self, values):
+    counts = {}
+    dt = time.monotonic() - self.startTime
+    self.startTime = time.monotonic()
+    text = ""
+    warn = False
+    for meas in values:
+      id = "{} {}".format(meas.series, meas.source)
+      if id in counts:
+        counts[id] += 1
+      else:
+        counts[id] = 1
+    if counts:
+      ids = list(counts.keys())
+      ids.sort()
+      for id in ids:
+        text += "{}: {:4d} in {:.03f}s, {:.1f}/s    ".format(id, counts[id], dt, counts[id] / dt)
+    else:
+      text = "0 Messungen in {:.03f}s               ".format(dt)
+      warn = True
+
+    if warn:
+      logger.warning(text)
+    else:
+      logger.info(text)
+    return values
+
+class Warning:
+  def __init__(self, parent):
+    pass
+
+  def execute(self, values):
+    for meas in values:
+      logger.warning(str(meas))
+      yield meas

+ 70 - 0
app/middlewares/filters.py

@@ -0,0 +1,70 @@
+import logging
+from .common import MatchSeries, ALLOWED_NAMES, ALLOWED_GLOBALS
+
+logger = logging.getLogger(__name__)
+"""
+This middleware filters the measurements by series and yields if any given field matches.
+"""
+class MatchAny(MatchSeries):
+  def __init__(self, parent, series, **kwargs) -> None:
+    super().__init__(series)
+    self._fields = kwargs
+
+  def execute(self, values):
+    for measurement in values:
+      dataset = self.get_series(measurement)
+
+      if not dataset:
+        continue
+      
+      if not self._fields:
+        yield measurement
+        continue
+
+      # check if any field matches
+      for field, value in self._fields.items():
+        v = getattr(dataset, field, None)
+        if v == value or (isinstance(v, tuple) and value in v):
+          yield measurement
+          break
+      
+"""
+This middleware filters the measurements by series and yields if all given fields match.
+"""
+class MatchAll(MatchSeries):
+  def __init__(self, parent, series, **kwargs) -> None:
+    super().__init__(series)
+    self._fields = kwargs
+
+  def execute(self, values):
+    for measurement in values:
+      dataset = self.get_series(measurement)
+      if not dataset:
+        continue
+
+      # check if all fields match
+      success = True
+      for field, value in self._fields.items():
+        v = getattr(dataset, field, None)
+        if (not isinstance(v, tuple) and v != value) or (isinstance(v, tuple) and not all(x == value for x in v)):
+          success = False
+          break
+      if success:
+        yield measurement
+
+class ComplexFilter():
+  def __init__(self, parent, predicate) -> None:
+    self._predicate = predicate
+    self._compiled = compile(predicate, "<string>", "eval")
+    # Validate allowed names
+    for name in self._compiled.co_names:
+        if name not in ALLOWED_NAMES:
+            raise NameError(f"The use of '{name}' is not allowed in '{predicate}'")
+
+  def execute(self, values):
+    for measurement in values:
+      try:
+        if eval(self._compiled, {"__builtins__": ALLOWED_GLOBALS}, measurement.__dict__):
+          yield measurement
+      except Exception as e:
+        logger.error(f"Error while evaluating predicate '{self._predicate}': {e}")

+ 34 - 0
app/middlewares/selectors.py

@@ -0,0 +1,34 @@
+import logging
+import re
+import dataclasses
+from .common import ALLOWED_NAMES, ALLOWED_GLOBALS
+from structures.common import BaseMeasurement
+
+logger = logging.getLogger(__name__)
+
+@dataclasses.dataclass(frozen=True)
+class Selection(BaseMeasurement):
+  value: str
+  series: str
+
+class ComplexSelector():
+  def __init__(self, parent, selector) -> None:
+    self._selector = selector
+    self._compiled = compile(selector, "<string>", "eval")
+    # Validate allowed names
+    for name in self._compiled.co_names:
+        if name not in ALLOWED_NAMES:
+            raise NameError(f"The use of '{name}' is not allowed in '{selector}'")
+
+  def execute(self, values):
+    for measurement in values:
+      try:
+        value = eval(self._compiled, {"__builtins__": ALLOWED_GLOBALS}, measurement.__dict__)
+        yield Selection(
+          timestamp = measurement.timestamp,
+          series = re.match(r"[\w_\.\-\(\)]+", self._selector).group(0),
+          source = "selection",
+          value=value
+        )
+      except Exception as e:
+        logger.error(f"Error while evaluating selector '{self._selector}': {e}")

+ 40 - 0
app/middlewares/time_correlation.py

@@ -0,0 +1,40 @@
+import logging
+from datetime import timedelta
+
+from structures.correlated import CorrelatedMeasurements
+
+logger = logging.getLogger(__name__)
+
+class TimeCorrelation:
+  def __init__(self, parent):
+    self.state = {}
+    self.timestamp = None
+    self.old_values = []
+
+  def execute(self, values: list):
+    # combine new values with old values and sort them by timestamp and series
+    values = sorted(self.old_values + values, key=lambda x: (x.timestamp, x.series))
+
+    results = []
+    # iterate over old values
+    for i, measurement in enumerate(values[:len(self.old_values)]):
+      self.state[type(measurement).__name__] = measurement
+
+      if self.timestamp and self.timestamp > measurement.timestamp:
+        logger.error(f"Timestamps are not in order: {measurement.series} is {self.timestamp - measurement.timestamp} to late")
+      
+      if len(values) > i+1 and values[i+1].timestamp == measurement.timestamp:
+        continue
+
+      self.timestamp = measurement.timestamp
+      results.append(CorrelatedMeasurements(
+        timestamp = measurement.timestamp,
+        source = ','.join([x.source for x in self.state.values()]),
+        measurement_24v = self.state.get("Measurement24v", None),
+        measurement_480v = self.state.get("Measurement480v", None),
+        measurement_plant = self.state.get("CompactLogixState", None) or self.state.get("S7State", None)
+      ))
+
+    # store new values for next iteration
+    self.old_values = values[len(self.old_values):]
+    return results

+ 79 - 0
app/outputs/csv_file.py

@@ -0,0 +1,79 @@
+import csv
+import os
+from datetime import datetime
+import dataclasses
+import zipfile
+import logging
+
+from structures.common import BaseMeasurement
+
+logger = logging.getLogger(__name__)
+
+class CSVStorage:
+  files = {}
+
+  def __init__(self, path) -> None:
+    self.path = path
+    self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
+  
+  def write(self, values: list):
+    try:
+      for meas in values:
+        if not meas.series in self.files:
+          self.files[meas.series] = CSVFile(self.path, meas.series, self.zipname)
+        self.files[meas.series].write(meas)
+    except Exception as ex:
+      logger.exception("CSV write failed")
+
+class CSVFile:
+  file = None
+  filename = None
+  row_count = 0
+
+  def __init__(self, path, series, zipname) -> None:
+    self.path = path
+    self.series = series
+    if not os.path.exists(self.path):
+      os.mkdir(self.path)
+    self.zipname = zipname
+    self.new_file()
+
+  def new_file(self):
+
+    if self.file:
+      self.file.close()
+      with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
+        zf.write(self.filename, os.path.basename(self.filename))
+      os.remove(self.filename)
+      
+    self.filename = os.path.join(self.path, F"{self.series}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
+    self.file = open(self.filename, "w", newline='')
+    self.writer = csv.writer(self.file, delimiter=',')
+
+  def write(self, meas):
+    row = dataclass_to_dict(meas)
+    if self.row_count == 0:
+      self.writer.writerow(row)
+    self.writer.writerow(row.values())
+    self.row_count += 1
+
+    if self.row_count % 1000 == 0:
+      self.file.flush()
+
+    if self.row_count > 50000:
+      self.new_file()
+      self.row_count = 0
+
+
+def dataclass_to_dict(dc, prefix=""):
+  ret = {}
+  for field in dataclasses.fields(dc):
+    value = getattr(dc, field.name)
+    if type(value) is tuple:
+      for i, v in enumerate(value):
+        ret[F"{prefix}{field.name}_{i}"] = v
+    elif isinstance(value, BaseMeasurement):
+      ret.update(dataclass_to_dict(value, F"{prefix}{value.series}_"))
+    else:
+      ret[F"{prefix}{field.name}"] = value
+  return ret

+ 41 - 0
app/outputs/influxdb.py

@@ -0,0 +1,41 @@
+import logging
+
+from influxdb_client import InfluxDBClient, Point
+from influxdb_client.client.write_api import SYNCHRONOUS
+import dataclasses
+
+logger = logging.getLogger(__name__)
+
+class InfluxDB:
+  def __init__(self, url, token, org, bucket):
+    self.client = InfluxDBClient(url, token, org=org)
+
+    self.bucket = bucket
+
+    self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
+    self.query_api = self.client.query_api()
+
+  def write(self, values):
+    points = []
+    for meas in values:
+      p = Point(meas.series).time(meas.timestamp).tag("source", meas.source)
+      for field in dataclasses.fields(meas):
+        if not field.name in ["timestamp", "series", "source"]:
+          value = getattr(meas, field.name)
+          if type(value) is bool:
+            p.field(field.name, int(value))
+          elif not type(value) is tuple:
+            p.field(field.name, value)
+          else:
+            for i, v in enumerate(value):
+              pt = Point(meas.series).time(meas.timestamp).tag("source", meas.source).tag("channel", i)
+              if type(v) is bool:
+                pt.field(F"{field.name}", int(v))
+              else:
+                pt.field(F"{field.name}", v)
+              points.append(pt)
+      points.append(p)
+    try:
+      self.write_api.write(bucket=self.bucket, record=points)
+    except Exception as ex:
+      logger.exception("Influx DB write failed")

+ 20 - 0
app/outputs/stdout.py

@@ -0,0 +1,20 @@
+import logging
+import datetime
+import json
+from dataclasses import asdict
+
+logger = logging.getLogger(__name__)
+
+class DateTimeEncoder(json.JSONEncoder):
+  def default(self, z):
+    if isinstance(z, datetime.datetime):
+      return z.isoformat()
+    else:
+      return super().default(z)
+
+class JSONOutput:
+  
+  def write(self, values: set):
+    for measurement in values:
+      d = asdict(measurement)
+      print(json.dumps(d, cls=DateTimeEncoder))

BIN
app/requirements.txt


+ 9 - 0
app/structures/common.py

@@ -0,0 +1,9 @@
+from dataclasses import dataclass, field
+from datetime import datetime
+from typing import Hashable
+
+@dataclass(frozen=True)
+class BaseMeasurement(Hashable):
+  timestamp: datetime
+  source: str
+  #series: str = field(init=False)

+ 9 - 0
app/structures/correlated.py

@@ -0,0 +1,9 @@
+from dataclasses import dataclass, field
+from structures.common import BaseMeasurement
+
+@dataclass(frozen=True)
+class CorrelatedMeasurements(BaseMeasurement):
+  series: str = field(default="correlated", init=False)
+  measurement_24v: BaseMeasurement
+  measurement_480v: BaseMeasurement
+  measurement_plant: BaseMeasurement

+ 23 - 0
app/structures/measurement.py

@@ -0,0 +1,23 @@
+from dataclasses import dataclass, field
+
+from .common import BaseMeasurement
+
+@dataclass(frozen=True)
+class Measurement24v(BaseMeasurement):
+  current: tuple # [float, ...]
+  status: tuple # [bool, ...]
+  overload: tuple # [bool, ...]
+  short_circuit: tuple # [bool, ...]
+  limit: tuple # [bool, ...]
+  pushbutton: tuple # [bool, ...]
+  voltage: float
+
+  series: str = field(default="24v")
+
+@dataclass(frozen=True)
+class Measurement480v(BaseMeasurement):
+  voltage: tuple # [float, ...]
+  current: tuple # [float, ...]
+  phase: tuple # [float, ...]
+
+  series: str = field(default="480v")

+ 19 - 0
app/structures/plant.py

@@ -0,0 +1,19 @@
+from dataclasses import dataclass, field
+
+from .common import BaseMeasurement
+
+@dataclass(frozen=True)
+class CompactLogixState(BaseMeasurement):
+  field1: int
+  field2: bool
+
+  series: str = field(default="plant")
+
+@dataclass(frozen=True)
+class S7State(BaseMeasurement):
+  cpu_running: bool
+  
+  field1: int
+  field2: bool
+
+  series: str = field(default="plant", init=False)

+ 74 - 0
config/config.yml

@@ -0,0 +1,74 @@
+
+Inputs:
+  - SiemensCPU: siemens.snap7_connect
+    enabled: False
+    host: "<S7 PLC IP>"
+
+  - SiemensServer: siemens.snap7_server
+    enabled: False
+    port: 102
+
+  - Balluff: balluff.balluff_html
+    enabled: False
+
+  - AllenBradleyCPU: rockwell.allen_bradley_connect
+    host: "<AB PLC IP>"
+    enabled: True
+
+  - Replay: replay_influxdb
+    enabled: False
+    url: "http://influxdb:8086"
+    token: "<token>"
+    org: "myorg"
+    bucket: "energy-monitor"
+    start_time: 01.01.2000 01:00:00
+
+Middlewares:
+  - PrintStats: debug
+    enabled: False
+  - TimeCorrelation: time_correlation
+    submodules:
+    - PrintStats: debug
+
+Outputs:
+  - CSVStorage: csv_file
+    path: logs
+
+  - InfluxDB: influxdb
+    url: "http://influxdb:8086"
+    token: "<token>"
+    org: "myorg"
+    bucket: "energy-monitor"
+
+Logging:
+  version: 1
+  formatters:
+    standard:
+      format: "%(asctime)s [%(levelname)s]   \t%(name)s: \t%(message)s"
+  handlers:
+    default:
+      level: INFO
+      formatter: standard
+      class: logging.StreamHandler
+      stream: ext://sys.stderr
+  loggers:
+    '':
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false
+    inputs.rockwell.allen_bradley_connect:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    snap7.server:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    __main__:
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false

+ 98 - 0
config/dummy-config.yml

@@ -0,0 +1,98 @@
+
+Inputs:
+  - SiemensCPU: siemens.snap7_connect
+    enabled: False
+    host: "<S7 PLC IP>"
+
+  - SiemensServer: siemens.snap7_server
+    enabled: False
+    port: 102
+
+  - Balluff: balluff.balluff_html
+    enabled: False
+
+  - AllenBradleyCPU: rockwell.allen_bradley_connect
+    enabled: False
+    host: "<AB PLC IP>"
+  
+  - Input: dummy
+    message: Hello World!
+
+  - Replay: replay_influxdb
+    enabled: False
+    url: "http://influxdb:8086"
+    token: "<token>"
+    org: "myorg"
+    bucket: "energy-monitor"
+    start_time: 01.01.2000 01:00:00
+
+Middlewares:
+  - Warning: debug
+    enabled: False
+    enable_output: False
+  - TimeCorrelation: time_correlation
+    submodules:
+    - MatchAny: filters
+      series: plant
+      enable_output: False
+      table_move_up: 1
+    - ComplexFilter: filters
+      predicate: "measurement_480v and measurement_24v and avg(measurement_24v.current) > 0.6"
+      submodules:
+      - ComplexSelector: selectors
+        selector: "avg(measurement_480v.current)"
+        enable_output: False
+    - Aggregate: aggregators
+      series: 24v
+      timespan: 0.1
+      avg:
+      - voltage
+      min:
+      - status
+      - overload
+      submodules:
+      - PrintStats: debug
+        enable_output: True
+
+
+
+Outputs:
+  - CSVStorage: csv_file
+    path: dummy-logs
+    enabled: True
+  - JSONOutput: stdout
+    enabled: False
+
+Logging:
+  version: 1
+  formatters:
+    standard:
+      #format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
+      format: "[%(levelname)s] %(name)s: %(message)s"
+  handlers:
+    default:
+      level: INFO
+      formatter: standard
+      class: logging.StreamHandler
+      stream: ext://sys.stderr
+  loggers:
+    '':
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false
+    inputs.rockwell.allen_bradley_connect:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    snap7.server:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    __main__:
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false

+ 76 - 0
config/replay-config.yml

@@ -0,0 +1,76 @@
+
+Inputs:
+  - SiemensCPU: siemens.snap7_connect
+    enabled: False
+    host: "<S7 PLC IP>"
+
+  - SiemensServer: siemens.snap7_server
+    enabled: False
+    port: 102
+
+  - Balluff: balluff.balluff_html
+    enabled: False
+
+  - AllenBradleyCPU: rockwell.allen_bradley_connect
+    host: "<AB PLC IP>"
+    enabled: False
+
+  - Replay: replay_influxdb
+    enabled: True
+    url: "http://influxdb-replay:8086"
+    token: "<token>"
+    org: "myorg"
+    bucket: "energy-monitor"
+    start_time: 01.01.2000 01:00:00 +02:00
+
+Middlewares:
+  - PrintStats: debug
+    enabled: True
+  - TimeCorrelation: time_correlation
+    enabled: False
+    enableOutput: False
+    submodules:
+    - PrintStats: debug
+
+Outputs:
+  - CSVStorage: csv_file
+    path: logs
+
+  - InfluxDB: influxdb
+    url: "http://influxdb:8086"
+    token: "<token>"
+    org: "myorg"
+    bucket: "energy-monitor"
+
+Logging:
+  version: 1
+  formatters:
+    standard:
+      format: "%(asctime)s [%(levelname)s]   \t%(name)s: \t%(message)s"
+  handlers:
+    default:
+      level: INFO
+      formatter: standard
+      class: logging.StreamHandler
+      stream: ext://sys.stderr
+  loggers:
+    '':
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false
+    inputs.rockwell.allen_bradley_connect:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    snap7.server:
+      handlers:
+      - default
+      level: WARNING
+      propagate: false
+    __main__:
+      handlers:
+      - default
+      level: DEBUG
+      propagate: false

+ 50 - 0
docker-compose.yml

@@ -0,0 +1,50 @@
+version: "3.7"
+   
+services:
+  plc-connector:
+    container_name: plc-connector
+    build: .
+
+    restart: on-failure
+
+    ports:
+      - 102:102
+
+    volumes:
+      - ./logs:/app/logs/
+      - ./config/config.yml:/app/config.yml
+
+    networks:
+      - influxdb
+  
+  influxdb:
+    container_name: influxdb
+    image: influxdb:latest
+    
+    restart: unless-stopped
+
+    ports:
+      - 8086:8086
+
+    networks:
+      - influxdb
+
+    volumes:
+      - "./conf:/etc/influxdb2"
+      - "./influxdbv2:/var/lib/influxdb2"
+
+  influxdb-replay:
+    container_name: influxdb-replay
+    image: influxdb:latest
+    
+    restart: unless-stopped
+
+    networks:
+      - influxdb
+
+    volumes:
+      - "./influxdbv2-replay:/var/lib/influxdb2"
+
+networks:
+  influxdb:
+    name: influxdb