lora_mqtt.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. #!/usr/bin/env python3
  2. """
  3. MQTT Script to communicate with The Things Network and control digital output.
  4. Author: Tobias Müller
  5. Licence: GPL v3
  6. Date: 23-12-06
  7. Changelog:
  8. 23-12-06 - create initial program
  9. 23-12-07 - change output of payload
  10. 23-12-12 - finish publish mqtt code
  11. """
  12. from paho.mqtt import client as mqtt
  13. import pigpio
  14. from time import sleep
  15. from sys import exit
  16. import json
  17. import socket
  18. from base64 import b64encode
  19. USER="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Username
  20. PASSWORD="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Password
  21. PUPLIC_MQTT_TLS_ADDRESS = "eu1.cloud.thethings.network"
  22. PUPLIC_MQTT_TLS_PORT = 8883
  23. DEVICE_IDS = ["eui-70b3d5e75e01131f"] # list of device IDs in an application
  24. DEVICE_PORT = 125 # standard port for watteco devices
  25. DEVICE_OUTPUT = 1 # value of 1 to 4
  26. PULISH_PRIORITY = "NORMAL" # priority to puplish messages: "LOWEST", "LOW", "BELOW_NORMAL", "NORMAL", "ABOVE_NORMAL", "HIGH", "HIGHEST"
  27. MIN_REPORT_INTERVAL = 60 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12
  28. MAX_REPORT_INTERVAL = 120 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12
  29. INTERVAL_UNIT = "sec" # "sec" for seconds or "min" for minutes
  30. GPIO = 5 # GPIO to detect shutdown
  31. QOS = 0 # Quality of Service for MQTT
  32. DEBUG = False
  33. # Note: On The Things Network’s public community network a Fair Use
  34. # Policy applies which limits the uplink airtime to 30 seconds per
  35. # day (24 hours) per node and the downlink messages to 10
  36. # messages per day (24 hours) per node. Higher SF (Spreading Factor)
  37. # results in higher airtime usage and vice versa. Report intervals must
  38. # be inside those limits.
  39. # https://www.thethingsnetwork.org/forum/t/fair-use-policy-explained/1300
  40. def debounce(gpio_client):
  41. active_low = 0
  42. active_high = 0
  43. max_cycle = 20
  44. delay = 0.01
  45. while active_low < max_cycle and active_high < max_cycle:
  46. if gpio_client.read(GPIO):
  47. active_high += 1
  48. active_low = 0
  49. else:
  50. active_high = 0
  51. active_low += 1
  52. sleep(delay)
  53. return 0 if active_high == max_cycle else 1
  54. def set_payload(data):
  55. frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2],
  56. "CmdID": [0x50, 2],
  57. "ClusterID": [0x06, 4],
  58. "Data": [2]}
  59. payload = ""
  60. for key, val in frame_setting.items():
  61. if key == "Fctrl":
  62. payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}"
  63. elif key == "CmdID" or key == "ClusterID":
  64. payload += F"{val[0]:0{val[-1]}X}"
  65. elif key == "Data":
  66. payload += F"{data:0{val[-1]}X}"
  67. return b64encode(bytes.fromhex(payload)).decode()
  68. def report_payload():
  69. frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2],
  70. "CmdID": [0x06, 2],
  71. "ClusterID": [0x06, 4],
  72. "AttributeID": [0x00, 4],
  73. "Attribute_type": [0x10, 4],
  74. "MinReport": [{"sec": 0x0000, "min": 0x8000}, 4],
  75. "MaxReport": [{"sec": 0x0000, "min": 0x8000}, 4],
  76. "ReportChange": [0x01, 2]}
  77. payload = ""
  78. for key, val in frame_setting.items():
  79. if key == "Fctrl":
  80. payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}"
  81. elif (key == "CmdID" or key == "ClusterID" or key == "AttributeID" or
  82. key == "Attribute_type" or key == "ReportChange"):
  83. payload += F"{val[0]:0{val[-1]}X}"
  84. elif key == "MinReport":
  85. payload += F"{val[0].get(INTERVAL_UNIT)|MIN_REPORT_INTERVAL:0{val[-1]}X}"
  86. elif key == "MaxReport":
  87. payload += F"{val[0].get(INTERVAL_UNIT)|MAX_REPORT_INTERVAL:0{val[-1]}X}"
  88. return b64encode(bytes.fromhex(payload)).decode()
  89. def mqtt_send(mqtt_client, payload):
  90. if DEVICE_IDS:
  91. for id in DEVICE_IDS:
  92. topic = "v3/" + USER + "/devices/" + id + "/down/push"
  93. message = '{"downlinks":[{"f_port":' + str(DEVICE_PORT) + ',"frm_payload":"' + payload + '","priority":"' + PULISH_PRIORITY + '"}]}'
  94. result = mqtt_client.publish(topic, message, QOS)
  95. if result[0] == 0:
  96. print(F"Publish to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}")
  97. else:
  98. print(F"Failed to publish to topic: \"{topic}\", QoS: {QOS}")
  99. else:
  100. print("There are no device IDs configured for MQTT-Subscription!")
  101. raise ValueError
  102. def mqtt_on_connect_callback(client, userdata, flags, rc):
  103. if rc == 0:
  104. print(F"Connected successfully to MQTT broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"")
  105. else:
  106. raise ConnectionError(F"Could not connect to MQTT-Broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"" +
  107. F"\nReturn-Code: {rc}")
  108. def mqtt_on_message_callback(client, userdata, message):
  109. print("\nMessage received on topic '" + message.topic + "' with QoS = " + str(message.qos))
  110. parsed_json = json.loads(message.payload)
  111. if DEBUG:
  112. print(f"Full Message (json):\n{json.dumps(parsed_json, indent=4)}")
  113. else:
  114. print("Message:")
  115. for key, val in parsed_json["end_device_ids"].items():
  116. if key == "device_id":
  117. print(F"\tDevice ID: {val}")
  118. if key == "application_ids":
  119. print(F"\tApplication ID: {val['application_id']}")
  120. for key, val in parsed_json["uplink_message"].items():
  121. if key == "f_port":
  122. print(F"\tPort: {val}")
  123. if key == "frm_payload":
  124. print(F"\tPayload: {val}")
  125. if key == "decoded_payload":
  126. print(F"\tDecoded Payload:")
  127. try:
  128. print(F"\t\tDate: {val['data'][0]['date']}")
  129. print(F"\t\tLabel: {val['data'][0]['label']}")
  130. print(F"\t\tValue: {val['data'][0]['value']}")
  131. except KeyError:
  132. print("\t\tCould not decode payload.")
  133. if key == "settings":
  134. print(F"\tFrequency: {val['frequency']}")
  135. print(F"\tBandwidth: {val['data_rate']['lora']['bandwidth']}")
  136. print(F"\tSpreading Factor: {val['data_rate']['lora']['spreading_factor']}")
  137. if key == "consumed_airtime":
  138. print(F"\tAirtime: {val}")
  139. def mqtt_on_subscribe_callback(client, userdata, mid, granted_qos):
  140. print(F"Subscription succesfull on message id: {mid}, and QoS: {granted_qos[0]}")
  141. def mqtt_on_disconnect_callback(client, userdata, rc):
  142. print(F"Disconnect from MQTT with result code: {rc}")
  143. def mqtt_on_publish_callback(client, userdata, mid):
  144. print(F"Publish succesfull on message ID: {mid}")
  145. def init_mqtt():
  146. mqtt_client = mqtt.Client(f"axxeo_monitoring-client")
  147. mqtt_client.on_connect = mqtt_on_connect_callback
  148. mqtt_client.on_message = mqtt_on_message_callback
  149. mqtt_client.on_subscribe = mqtt_on_subscribe_callback
  150. mqtt_client.on_disconnect = mqtt_on_disconnect_callback
  151. mqtt_client.on_publish = mqtt_on_publish_callback
  152. mqtt_client.username_pw_set(USER, PASSWORD)
  153. mqtt_client.tls_set()
  154. mqtt_client.connect(PUPLIC_MQTT_TLS_ADDRESS, PUPLIC_MQTT_TLS_PORT)
  155. if DEVICE_IDS:
  156. for id in DEVICE_IDS:
  157. topic = "v3/" + USER + "/devices/" + id + "/up"
  158. result = mqtt_client.subscribe(topic, QOS)
  159. if result[0] == 0:
  160. print(F"Subscribe to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}")
  161. else:
  162. print(F"Failed to subsribe to topic: \"{topic}\", QoS: {QOS}")
  163. else:
  164. print("There are no device IDs configured for MQTT-Subscription!")
  165. raise ValueError
  166. payload = report_payload()
  167. mqtt_send(mqtt_client, payload)
  168. return mqtt_client
  169. def init_gpio():
  170. gpio_client = pigpio.pi()
  171. if not gpio_client.connected:
  172. print("The pigpio daemon is not running!")
  173. raise ConnectionError
  174. gpio_client.set_mode(GPIO, pigpio.INPUT)
  175. return gpio_client
  176. def main():
  177. try:
  178. print("Initialize GPIO")
  179. gpio_client = init_gpio()
  180. print("Initialize MQTT")
  181. mqtt_client = init_mqtt()
  182. print("\nStart Communication:")
  183. old_val = debounce(gpio_client)
  184. payload = set_payload(old_val)
  185. mqtt_send(mqtt_client, payload)
  186. while True:
  187. new_val = debounce(gpio_client)
  188. if old_val != new_val:
  189. old_val = new_val
  190. if old_val:
  191. print("\nSwitch on windpower plant!")
  192. else:
  193. print("\nSwitch off windpower plant!")
  194. payload = set_payload(new_val)
  195. mqtt_send(mqtt_client, payload)
  196. mqtt_client.loop(10)
  197. except KeyboardInterrupt:
  198. exit(0)
  199. except ConnectionError:
  200. exit(1)
  201. except socket.gaierror:
  202. print(F"\nNo address associated with hostname \"{PUPLIC_MQTT_TLS_ADDRESS}\"")
  203. exit(1)
  204. except ValueError:
  205. exit(1)
  206. finally:
  207. if "mqtt_client" in locals() and mqtt_client.is_connected():
  208. mqtt_client.disconnect()
  209. if "gpio_client" in locals() and gpio_client.connected:
  210. gpio_client.stop()
  211. print("\nExit program!")
  212. if __name__ == "__main__":
  213. main()