|
@@ -0,0 +1,247 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+
|
|
|
+"""
|
|
|
+MQTT Script to communicate with The Things Network and control digital output.
|
|
|
+
|
|
|
+Author: Tobias Müller
|
|
|
+Licence: GPL v3
|
|
|
+Date: 23-12-06
|
|
|
+
|
|
|
+Changelog:
|
|
|
+ 23-12-06 - create initial program
|
|
|
+ 23-12-07 - change output of payload
|
|
|
+ 23-12-12 - finish publish mqtt code
|
|
|
+"""
|
|
|
+
|
|
|
+from paho.mqtt import client as mqtt
|
|
|
+import pigpio
|
|
|
+from time import sleep
|
|
|
+from sys import exit
|
|
|
+import json
|
|
|
+import socket
|
|
|
+from base64 import b64encode
|
|
|
+
|
|
|
+USER="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Username
|
|
|
+PASSWORD="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Password
|
|
|
+PUPLIC_MQTT_TLS_ADDRESS = "eu1.cloud.thethings.network"
|
|
|
+PUPLIC_MQTT_TLS_PORT = 8883
|
|
|
+DEVICE_IDS = ["eui-70b3d5e75e01131f"] # list of device IDs in an application
|
|
|
+DEVICE_PORT = 125 # standard port for watteco devices
|
|
|
+DEVICE_OUTPUT = 1 # value of 1 to 4
|
|
|
+PULISH_PRIORITY = "NORMAL" # priority to puplish messages: "LOWEST", "LOW", "BELOW_NORMAL", "NORMAL", "ABOVE_NORMAL", "HIGH", "HIGHEST"
|
|
|
+MIN_REPORT_INTERVAL = 60 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12
|
|
|
+MAX_REPORT_INTERVAL = 120 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12
|
|
|
+INTERVAL_UNIT = "sec" # "sec" for seconds or "min" for minutes
|
|
|
+GPIO = 5 # GPIO to detect shutdown
|
|
|
+QOS = 0 # Quality of Service for MQTT
|
|
|
+DEBUG = False
|
|
|
+
|
|
|
+# Note: On The Things Network’s public community network a Fair Use
|
|
|
+# Policy applies which limits the uplink airtime to 30 seconds per
|
|
|
+# day (24 hours) per node and the downlink messages to 10
|
|
|
+# messages per day (24 hours) per node. Higher SF (Spreading Factor)
|
|
|
+# results in higher airtime usage and vice versa. Report intervals must
|
|
|
+# be inside those limits.
|
|
|
+# https://www.thethingsnetwork.org/forum/t/fair-use-policy-explained/1300
|
|
|
+
|
|
|
+def debounce(gpio_client):
|
|
|
+ active_low = 0
|
|
|
+ active_high = 0
|
|
|
+ max_cycle = 20
|
|
|
+ delay = 0.01
|
|
|
+ while active_low < max_cycle and active_high < max_cycle:
|
|
|
+ if gpio_client.read(GPIO):
|
|
|
+ active_high += 1
|
|
|
+ active_low = 0
|
|
|
+ else:
|
|
|
+ active_high = 0
|
|
|
+ active_low += 1
|
|
|
+ sleep(delay)
|
|
|
+ return 0 if active_high == max_cycle else 1
|
|
|
+
|
|
|
+
|
|
|
+def set_payload(data):
|
|
|
+ frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2],
|
|
|
+ "CmdID": [0x50, 2],
|
|
|
+ "ClusterID": [0x06, 4],
|
|
|
+ "Data": [2]}
|
|
|
+
|
|
|
+ payload = ""
|
|
|
+ for key, val in frame_setting.items():
|
|
|
+ if key == "Fctrl":
|
|
|
+ payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}"
|
|
|
+ elif key == "CmdID" or key == "ClusterID":
|
|
|
+ payload += F"{val[0]:0{val[-1]}X}"
|
|
|
+ elif key == "Data":
|
|
|
+ payload += F"{data:0{val[-1]}X}"
|
|
|
+
|
|
|
+ return b64encode(bytes.fromhex(payload)).decode()
|
|
|
+
|
|
|
+
|
|
|
+def report_payload():
|
|
|
+ frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2],
|
|
|
+ "CmdID": [0x06, 2],
|
|
|
+ "ClusterID": [0x06, 4],
|
|
|
+ "AttributeID": [0x00, 4],
|
|
|
+ "Attribute_type": [0x10, 4],
|
|
|
+ "MinReport": [{"sec": 0x0000, "min": 0x8000}, 4],
|
|
|
+ "MaxReport": [{"sec": 0x0000, "min": 0x8000}, 4],
|
|
|
+ "ReportChange": [0x01, 2]}
|
|
|
+
|
|
|
+ payload = ""
|
|
|
+ for key, val in frame_setting.items():
|
|
|
+ if key == "Fctrl":
|
|
|
+ payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}"
|
|
|
+ elif (key == "CmdID" or key == "ClusterID" or key == "AttributeID" or
|
|
|
+ key == "Attribute_type" or key == "ReportChange"):
|
|
|
+ payload += F"{val[0]:0{val[-1]}X}"
|
|
|
+ elif key == "MinReport":
|
|
|
+ payload += F"{val[0].get(INTERVAL_UNIT)|MIN_REPORT_INTERVAL:0{val[-1]}X}"
|
|
|
+ elif key == "MaxReport":
|
|
|
+ payload += F"{val[0].get(INTERVAL_UNIT)|MAX_REPORT_INTERVAL:0{val[-1]}X}"
|
|
|
+
|
|
|
+ return b64encode(bytes.fromhex(payload)).decode()
|
|
|
+
|
|
|
+
|
|
|
+def mqtt_send(mqtt_client, payload):
|
|
|
+ if DEVICE_IDS:
|
|
|
+ for id in DEVICE_IDS:
|
|
|
+ topic = "v3/" + USER + "/devices/" + id + "/down/push"
|
|
|
+ message = '{"downlinks":[{"f_port":' + str(DEVICE_PORT) + ',"frm_payload":"' + payload + '","priority":"' + PULISH_PRIORITY + '"}]}'
|
|
|
+ result = mqtt_client.publish(topic, message, QOS)
|
|
|
+ if result[0] == 0:
|
|
|
+ print(F"Publish to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}")
|
|
|
+ else:
|
|
|
+ print(F"Failed to publish to topic: \"{topic}\", QoS: {QOS}")
|
|
|
+ else:
|
|
|
+ print("There are no device IDs configured for MQTT-Subscription!")
|
|
|
+ raise ValueError
|
|
|
+
|
|
|
+
|
|
|
+def mqtt_on_connect_callback(client, userdata, flags, rc):
|
|
|
+ if rc == 0:
|
|
|
+ print(F"Connected successfully to MQTT broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"")
|
|
|
+ else:
|
|
|
+ raise ConnectionError(F"Could not connect to MQTT-Broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"" +
|
|
|
+ F"\nReturn-Code: {rc}")
|
|
|
+
|
|
|
+
|
|
|
+def mqtt_on_message_callback(client, userdata, message):
|
|
|
+ print("\nMessage received on topic '" + message.topic + "' with QoS = " + str(message.qos))
|
|
|
+ parsed_json = json.loads(message.payload)
|
|
|
+ if DEBUG:
|
|
|
+ print(f"Full Message (json):\n{json.dumps(parsed_json, indent=4)}")
|
|
|
+ else:
|
|
|
+ print("Message:")
|
|
|
+ for key, val in parsed_json["end_device_ids"].items():
|
|
|
+ if key == "device_id":
|
|
|
+ print(F"\tDevice ID: {val}")
|
|
|
+ if key == "application_ids":
|
|
|
+ print(F"\tApplication ID: {val['application_id']}")
|
|
|
+ for key, val in parsed_json["uplink_message"].items():
|
|
|
+ if key == "f_port":
|
|
|
+ print(F"\tPort: {val}")
|
|
|
+ if key == "frm_payload":
|
|
|
+ print(F"\tPayload: {val}")
|
|
|
+ if key == "decoded_payload":
|
|
|
+ print(F"\tDecoded Payload:")
|
|
|
+ try:
|
|
|
+ print(F"\t\tDate: {val['data'][0]['date']}")
|
|
|
+ print(F"\t\tLabel: {val['data'][0]['label']}")
|
|
|
+ print(F"\t\tValue: {val['data'][0]['value']}")
|
|
|
+ except KeyError:
|
|
|
+ print("\t\tCould not decode payload.")
|
|
|
+ if key == "settings":
|
|
|
+ print(F"\tFrequency: {val['frequency']}")
|
|
|
+ print(F"\tBandwidth: {val['data_rate']['lora']['bandwidth']}")
|
|
|
+ print(F"\tSpreading Factor: {val['data_rate']['lora']['spreading_factor']}")
|
|
|
+ if key == "consumed_airtime":
|
|
|
+ print(F"\tAirtime: {val}")
|
|
|
+
|
|
|
+def mqtt_on_subscribe_callback(client, userdata, mid, granted_qos):
|
|
|
+ print(F"Subscription succesfull on message id: {mid}, and QoS: {granted_qos[0]}")
|
|
|
+
|
|
|
+
|
|
|
+def mqtt_on_disconnect_callback(client, userdata, rc):
|
|
|
+ print(F"Disconnect from MQTT with result code: {rc}")
|
|
|
+
|
|
|
+
|
|
|
+def mqtt_on_publish_callback(client, userdata, mid):
|
|
|
+ print(F"Publish succesfull on message ID: {mid}")
|
|
|
+
|
|
|
+
|
|
|
+def init_mqtt():
|
|
|
+ mqtt_client = mqtt.Client(f"axxeo_monitoring-client")
|
|
|
+ mqtt_client.on_connect = mqtt_on_connect_callback
|
|
|
+ mqtt_client.on_message = mqtt_on_message_callback
|
|
|
+ mqtt_client.on_subscribe = mqtt_on_subscribe_callback
|
|
|
+ mqtt_client.on_disconnect = mqtt_on_disconnect_callback
|
|
|
+ mqtt_client.on_publish = mqtt_on_publish_callback
|
|
|
+ mqtt_client.username_pw_set(USER, PASSWORD)
|
|
|
+ mqtt_client.tls_set()
|
|
|
+ mqtt_client.connect(PUPLIC_MQTT_TLS_ADDRESS, PUPLIC_MQTT_TLS_PORT)
|
|
|
+ if DEVICE_IDS:
|
|
|
+ for id in DEVICE_IDS:
|
|
|
+ topic = "v3/" + USER + "/devices/" + id + "/up"
|
|
|
+ result = mqtt_client.subscribe(topic, QOS)
|
|
|
+ if result[0] == 0:
|
|
|
+ print(F"Subscribe to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}")
|
|
|
+ else:
|
|
|
+ print(F"Failed to subsribe to topic: \"{topic}\", QoS: {QOS}")
|
|
|
+ else:
|
|
|
+ print("There are no device IDs configured for MQTT-Subscription!")
|
|
|
+ raise ValueError
|
|
|
+ payload = report_payload()
|
|
|
+ mqtt_send(mqtt_client, payload)
|
|
|
+ return mqtt_client
|
|
|
+
|
|
|
+
|
|
|
+def init_gpio():
|
|
|
+ gpio_client = pigpio.pi()
|
|
|
+ if not gpio_client.connected:
|
|
|
+ print("The pigpio daemon is not running!")
|
|
|
+ raise ConnectionError
|
|
|
+ gpio_client.set_mode(GPIO, pigpio.INPUT)
|
|
|
+ return gpio_client
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ try:
|
|
|
+ print("Initialize GPIO")
|
|
|
+ gpio_client = init_gpio()
|
|
|
+ print("Initialize MQTT")
|
|
|
+ mqtt_client = init_mqtt()
|
|
|
+ print("\nStart Communication:")
|
|
|
+ old_val = debounce(gpio_client)
|
|
|
+ payload = set_payload(old_val)
|
|
|
+ mqtt_send(mqtt_client, payload)
|
|
|
+ while True:
|
|
|
+ new_val = debounce(gpio_client)
|
|
|
+ if old_val != new_val:
|
|
|
+ old_val = new_val
|
|
|
+ if old_val:
|
|
|
+ print("\nSwitch on windpower plant!")
|
|
|
+ else:
|
|
|
+ print("\nSwitch off windpower plant!")
|
|
|
+ payload = set_payload(new_val)
|
|
|
+ mqtt_send(mqtt_client, payload)
|
|
|
+ mqtt_client.loop(10)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ exit(0)
|
|
|
+ except ConnectionError:
|
|
|
+ exit(1)
|
|
|
+ except socket.gaierror:
|
|
|
+ print(F"\nNo address associated with hostname \"{PUPLIC_MQTT_TLS_ADDRESS}\"")
|
|
|
+ exit(1)
|
|
|
+ except ValueError:
|
|
|
+ exit(1)
|
|
|
+ finally:
|
|
|
+ if "mqtt_client" in locals() and mqtt_client.is_connected():
|
|
|
+ mqtt_client.disconnect()
|
|
|
+ if "gpio_client" in locals() and gpio_client.connected:
|
|
|
+ gpio_client.stop()
|
|
|
+ print("\nExit program!")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|