dummy.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import logging
  2. import random
  3. import math
  4. from datetime import datetime, timedelta, time
  5. from inputs.common import Input as Inp
  6. from structures.measurement import Measurement24v, Measurement480v
  7. from structures.plant import S7State, CompactLogixState
  8. logger = logging.getLogger(__name__)
  9. localtz = datetime.now().astimezone().tzinfo
  10. def f():
  11. return random.random()
  12. def b():
  13. return random.choice([True, False])
  14. def i(count=100):
  15. return random.randint(0, count-1)
  16. class Input(Inp):
  17. def __init__(self, message) -> None:
  18. super().__init__(self.read_handler)
  19. logger.debug(message)
  20. self.interval = 0.01
  21. def read_handler(self):
  22. current = datetime.now()
  23. current_td = timedelta(
  24. hours = current.hour,
  25. minutes = current.minute,
  26. seconds = current.second,
  27. microseconds = current.microsecond)
  28. # discretize to specified resolution
  29. to_sec = timedelta(seconds = round(current_td.total_seconds(), max(0, int(-math.log10(self.interval)))))
  30. timestamp = (datetime.combine(current, time(0)) + to_sec).astimezone(localtz)
  31. self._q.put(Measurement24v(
  32. timestamp - timedelta(seconds=0.05),
  33. "dummy24v",
  34. (f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f(), f()),
  35. (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
  36. (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
  37. (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
  38. (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
  39. (b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b(), b()),
  40. f() + 23.5
  41. ))
  42. self._q.put(Measurement480v(
  43. timestamp - timedelta(seconds=0.03),
  44. "dummy480v",
  45. (f()+230, f()+230, f()+230),
  46. (f(), f(), f()),
  47. (i(360), i(360), i(360))
  48. ))
  49. self._q.put(CompactLogixState(
  50. timestamp,
  51. "dummyAB",
  52. i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2),
  53. i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2),
  54. i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2),
  55. i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2),
  56. i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2),
  57. i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2), i(2),
  58. i(2), i(2)
  59. ))