main.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. #!/usr/bin/env python3
  2. import time
  3. import yaml
  4. import argparse
  5. import logging
  6. from logging.config import dictConfig
  7. from importlib import import_module
  8. # get config file from arguments
  9. parser = argparse.ArgumentParser(description='PLC Connector')
  10. parser.add_argument('-c', '--config', type=str, default='config.yml', help='config file')
  11. args = parser.parse_args()
  12. # read config
  13. config = yaml.safe_load(open(args.config, 'r'))
  14. dictConfig(config['Logging'])
  15. logger = logging.getLogger(__name__)
  16. def createModules(configItems, type):
  17. for item in configItems:
  18. cls = next(iter(item))
  19. module = import_module(f"{type}s.{item[cls]}")
  20. if item.get('enabled') == False:
  21. continue
  22. params = item.copy()
  23. params.pop(cls, None)
  24. params.pop('enabled', None)
  25. params.pop('submodules', None)
  26. params.pop('enable_output', None)
  27. try:
  28. yield getattr(module, cls)(**params)
  29. except Exception as ex:
  30. logger.fatal(F"{type} {cls} couldn't be initialized.", exc_info=False)
  31. raise
  32. # setup input modules
  33. inputs = list(createModules(config['Inputs'], "input"))
  34. # setup middlewares recursively
  35. def createMiddlewares(configItems, parent = None):
  36. items = [dict(x, parent=parent) for x in configItems if x.get('enabled') != False]
  37. middlewares = list(createModules(items, "middleware"))
  38. for (item, middleware) in zip(items, middlewares):
  39. if 'submodules' in item:
  40. middleware.submodules = list(createMiddlewares(item['submodules'], middleware))
  41. middleware.enable_output = item.get('enable_output', False)
  42. else:
  43. middleware.enable_output = item.get('enable_output', True)
  44. return middlewares
  45. middlewares = createMiddlewares(config['Middlewares'])
  46. # setup output modules
  47. outputs = list(createModules(config['Outputs'], "output"))
  48. for source in inputs:
  49. source.start()
  50. logger.debug("started sources")
  51. def executeMiddleware(middleware, values):
  52. submodules = getattr(middleware, 'submodules', [])
  53. result = list(middleware.execute(values))
  54. if not submodules and middleware.enable_output:
  55. return result
  56. else:
  57. subResults = []
  58. for submodule in submodules:
  59. subResults += executeMiddleware(submodule, result)
  60. return subResults
  61. while True:
  62. values = set()
  63. for input in inputs:
  64. values.update(input.read())
  65. # sort the set by timestamp and series
  66. values = sorted(values, key=lambda x: (x.timestamp, x.series))
  67. # execute middlewares recursively and collect results of leaf modules
  68. results = set()
  69. for middleware in middlewares:
  70. tmp = executeMiddleware(middleware, values)
  71. if tmp:
  72. results.update(tmp)
  73. if not middlewares:
  74. results = values
  75. # sort the set by timestamp and series
  76. results = sorted(results, key=lambda x: (x.timestamp, x.series))
  77. for output in outputs:
  78. output.write(results)
  79. time.sleep(1.9)