#!/usr/bin/env python3 """ MQTT Script to communicate with The Things Network and control digital output. Author: Tobias Müller Licence: GPL v3 Date: 23-12-06 Changelog: 23-12-06 - create initial program 23-12-07 - change output of payload 23-12-12 - finish publish mqtt code """ from paho.mqtt import client as mqtt import pigpio from time import sleep from sys import exit import json import socket from base64 import b64encode USER="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Username PASSWORD="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Password PUPLIC_MQTT_TLS_ADDRESS = "eu1.cloud.thethings.network" PUPLIC_MQTT_TLS_PORT = 8883 DEVICE_IDS = ["eui-70b3d5e75e01131f"] # list of device IDs in an application DEVICE_PORT = 125 # standard port for watteco devices DEVICE_OUTPUT = 1 # value of 1 to 4 PULISH_PRIORITY = "NORMAL" # priority to puplish messages: "LOWEST", "LOW", "BELOW_NORMAL", "NORMAL", "ABOVE_NORMAL", "HIGH", "HIGHEST" MIN_REPORT_INTERVAL = 60 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12 MAX_REPORT_INTERVAL = 120 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12 INTERVAL_UNIT = "sec" # "sec" for seconds or "min" for minutes GPIO = 5 # GPIO to detect shutdown QOS = 0 # Quality of Service for MQTT DEBUG = False # Note: On The Things Network’s public community network a Fair Use # Policy applies which limits the uplink airtime to 30 seconds per # day (24 hours) per node and the downlink messages to 10 # messages per day (24 hours) per node. Higher SF (Spreading Factor) # results in higher airtime usage and vice versa. Report intervals must # be inside those limits. # https://www.thethingsnetwork.org/forum/t/fair-use-policy-explained/1300 def debounce(gpio_client): active_low = 0 active_high = 0 max_cycle = 20 delay = 0.01 while active_low < max_cycle and active_high < max_cycle: if gpio_client.read(GPIO): active_high += 1 active_low = 0 else: active_high = 0 active_low += 1 sleep(delay) return 0 if active_high == max_cycle else 1 def set_payload(data): frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2], "CmdID": [0x50, 2], "ClusterID": [0x06, 4], "Data": [2]} payload = "" for key, val in frame_setting.items(): if key == "Fctrl": payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}" elif key == "CmdID" or key == "ClusterID": payload += F"{val[0]:0{val[-1]}X}" elif key == "Data": payload += F"{data:0{val[-1]}X}" return b64encode(bytes.fromhex(payload)).decode() def report_payload(): frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2], "CmdID": [0x06, 2], "ClusterID": [0x06, 4], "AttributeID": [0x00, 4], "Attribute_type": [0x10, 4], "MinReport": [{"sec": 0x0000, "min": 0x8000}, 4], "MaxReport": [{"sec": 0x0000, "min": 0x8000}, 4], "ReportChange": [0x01, 2]} payload = "" for key, val in frame_setting.items(): if key == "Fctrl": payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}" elif (key == "CmdID" or key == "ClusterID" or key == "AttributeID" or key == "Attribute_type" or key == "ReportChange"): payload += F"{val[0]:0{val[-1]}X}" elif key == "MinReport": payload += F"{val[0].get(INTERVAL_UNIT)|MIN_REPORT_INTERVAL:0{val[-1]}X}" elif key == "MaxReport": payload += F"{val[0].get(INTERVAL_UNIT)|MAX_REPORT_INTERVAL:0{val[-1]}X}" return b64encode(bytes.fromhex(payload)).decode() def mqtt_send(mqtt_client, payload): if DEVICE_IDS: for id in DEVICE_IDS: topic = "v3/" + USER + "/devices/" + id + "/down/push" message = '{"downlinks":[{"f_port":' + str(DEVICE_PORT) + ',"frm_payload":"' + payload + '","priority":"' + PULISH_PRIORITY + '"}]}' result = mqtt_client.publish(topic, message, QOS) if result[0] == 0: print(F"Publish to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}") else: print(F"Failed to publish to topic: \"{topic}\", QoS: {QOS}") else: print("There are no device IDs configured for MQTT-Subscription!") raise ValueError def mqtt_on_connect_callback(client, userdata, flags, rc): if rc == 0: print(F"Connected successfully to MQTT broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"") else: raise ConnectionError(F"Could not connect to MQTT-Broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"" + F"\nReturn-Code: {rc}") def mqtt_on_message_callback(client, userdata, message): print("\nMessage received on topic '" + message.topic + "' with QoS = " + str(message.qos)) parsed_json = json.loads(message.payload) if DEBUG: print(f"Full Message (json):\n{json.dumps(parsed_json, indent=4)}") else: print("Message:") for key, val in parsed_json["end_device_ids"].items(): if key == "device_id": print(F"\tDevice ID: {val}") if key == "application_ids": print(F"\tApplication ID: {val['application_id']}") for key, val in parsed_json["uplink_message"].items(): if key == "f_port": print(F"\tPort: {val}") if key == "frm_payload": print(F"\tPayload: {val}") if key == "decoded_payload": print(F"\tDecoded Payload:") try: print(F"\t\tDate: {val['data'][0]['date']}") print(F"\t\tLabel: {val['data'][0]['label']}") print(F"\t\tValue: {val['data'][0]['value']}") except KeyError: print("\t\tCould not decode payload.") if key == "settings": print(F"\tFrequency: {val['frequency']}") print(F"\tBandwidth: {val['data_rate']['lora']['bandwidth']}") print(F"\tSpreading Factor: {val['data_rate']['lora']['spreading_factor']}") if key == "consumed_airtime": print(F"\tAirtime: {val}") def mqtt_on_subscribe_callback(client, userdata, mid, granted_qos): print(F"Subscription succesfull on message id: {mid}, and QoS: {granted_qos[0]}") def mqtt_on_disconnect_callback(client, userdata, rc): print(F"Disconnect from MQTT with result code: {rc}") def mqtt_on_publish_callback(client, userdata, mid): print(F"Publish succesfull on message ID: {mid}") def init_mqtt(): mqtt_client = mqtt.Client(f"axxeo_monitoring-client") mqtt_client.on_connect = mqtt_on_connect_callback mqtt_client.on_message = mqtt_on_message_callback mqtt_client.on_subscribe = mqtt_on_subscribe_callback mqtt_client.on_disconnect = mqtt_on_disconnect_callback mqtt_client.on_publish = mqtt_on_publish_callback mqtt_client.username_pw_set(USER, PASSWORD) mqtt_client.tls_set() mqtt_client.connect(PUPLIC_MQTT_TLS_ADDRESS, PUPLIC_MQTT_TLS_PORT) if DEVICE_IDS: for id in DEVICE_IDS: topic = "v3/" + USER + "/devices/" + id + "/up" result = mqtt_client.subscribe(topic, QOS) if result[0] == 0: print(F"Subscribe to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}") else: print(F"Failed to subsribe to topic: \"{topic}\", QoS: {QOS}") else: print("There are no device IDs configured for MQTT-Subscription!") raise ValueError payload = report_payload() mqtt_send(mqtt_client, payload) return mqtt_client def init_gpio(): gpio_client = pigpio.pi() if not gpio_client.connected: print("The pigpio daemon is not running!") raise ConnectionError gpio_client.set_mode(GPIO, pigpio.INPUT) return gpio_client def main(): try: print("Initialize GPIO") gpio_client = init_gpio() print("Initialize MQTT") mqtt_client = init_mqtt() print("\nStart Communication:") old_val = debounce(gpio_client) payload = set_payload(old_val) mqtt_send(mqtt_client, payload) while True: new_val = debounce(gpio_client) if old_val != new_val: old_val = new_val if old_val: print("\nSwitch on windpower plant!") else: print("\nSwitch off windpower plant!") payload = set_payload(new_val) mqtt_send(mqtt_client, payload) mqtt_client.loop(10) except KeyboardInterrupt: exit(0) except ConnectionError: exit(1) except socket.gaierror: print(F"\nNo address associated with hostname \"{PUPLIC_MQTT_TLS_ADDRESS}\"") exit(1) except ValueError: exit(1) finally: if "mqtt_client" in locals() and mqtt_client.is_connected(): mqtt_client.disconnect() if "gpio_client" in locals() and gpio_client.connected: gpio_client.stop() print("\nExit program!") if __name__ == "__main__": main()