123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- #!/usr/bin/env python3
- """
- MQTT Script to communicate with The Things Network and control digital output.
- Author: Tobias Müller
- Licence: GPL v3
- Date: 23-12-06
- Changelog:
- 23-12-06 - create initial program
- 23-12-07 - change output of payload
- 23-12-12 - finish publish mqtt code
- """
- from paho.mqtt import client as mqtt
- import pigpio
- from time import sleep
- from sys import exit
- import json
- import socket
- from base64 import b64encode
- USER="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Username
- PASSWORD="" # https://eu1.cloud.thethings.network/console/ -> Applications -> Integrations -> MQTT -> Password
- PUPLIC_MQTT_TLS_ADDRESS = "eu1.cloud.thethings.network"
- PUPLIC_MQTT_TLS_PORT = 8883
- DEVICE_IDS = ["eui-70b3d5e75e01131f"] # list of device IDs in an application
- DEVICE_PORT = 125 # standard port for watteco devices
- DEVICE_OUTPUT = 1 # value of 1 to 4
- PULISH_PRIORITY = "NORMAL" # priority to puplish messages: "LOWEST", "LOW", "BELOW_NORMAL", "NORMAL", "ABOVE_NORMAL", "HIGH", "HIGHEST"
- MIN_REPORT_INTERVAL = 60 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12
- MAX_REPORT_INTERVAL = 120 # 0-32767; in production, value must be >= 60min/3600s to comply with the legal duty cycle in the EU at SF12
- INTERVAL_UNIT = "sec" # "sec" for seconds or "min" for minutes
- GPIO = 5 # GPIO to detect shutdown
- QOS = 0 # Quality of Service for MQTT
- DEBUG = False
- # Note: On The Things Network’s public community network a Fair Use
- # Policy applies which limits the uplink airtime to 30 seconds per
- # day (24 hours) per node and the downlink messages to 10
- # messages per day (24 hours) per node. Higher SF (Spreading Factor)
- # results in higher airtime usage and vice versa. Report intervals must
- # be inside those limits.
- # https://www.thethingsnetwork.org/forum/t/fair-use-policy-explained/1300
- def debounce(gpio_client):
- active_low = 0
- active_high = 0
- max_cycle = 20
- delay = 0.01
- while active_low < max_cycle and active_high < max_cycle:
- if gpio_client.read(GPIO):
- active_high += 1
- active_low = 0
- else:
- active_high = 0
- active_low += 1
- sleep(delay)
- return 0 if active_high == max_cycle else 1
- def set_payload(data):
- frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2],
- "CmdID": [0x50, 2],
- "ClusterID": [0x06, 4],
- "Data": [2]}
- payload = ""
- for key, val in frame_setting.items():
- if key == "Fctrl":
- payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}"
- elif key == "CmdID" or key == "ClusterID":
- payload += F"{val[0]:0{val[-1]}X}"
- elif key == "Data":
- payload += F"{data:0{val[-1]}X}"
- return b64encode(bytes.fromhex(payload)).decode()
- def report_payload():
- frame_setting = {"Fctrl": [{1: 0x11, 2: 0x31, 3: 0x51, 4:0x71}, 2],
- "CmdID": [0x06, 2],
- "ClusterID": [0x06, 4],
- "AttributeID": [0x00, 4],
- "Attribute_type": [0x10, 4],
- "MinReport": [{"sec": 0x0000, "min": 0x8000}, 4],
- "MaxReport": [{"sec": 0x0000, "min": 0x8000}, 4],
- "ReportChange": [0x01, 2]}
- payload = ""
- for key, val in frame_setting.items():
- if key == "Fctrl":
- payload += F"{val[0].get(DEVICE_OUTPUT):0{val[-1]}X}"
- elif (key == "CmdID" or key == "ClusterID" or key == "AttributeID" or
- key == "Attribute_type" or key == "ReportChange"):
- payload += F"{val[0]:0{val[-1]}X}"
- elif key == "MinReport":
- payload += F"{val[0].get(INTERVAL_UNIT)|MIN_REPORT_INTERVAL:0{val[-1]}X}"
- elif key == "MaxReport":
- payload += F"{val[0].get(INTERVAL_UNIT)|MAX_REPORT_INTERVAL:0{val[-1]}X}"
- return b64encode(bytes.fromhex(payload)).decode()
- def mqtt_send(mqtt_client, payload):
- if DEVICE_IDS:
- for id in DEVICE_IDS:
- topic = "v3/" + USER + "/devices/" + id + "/down/push"
- message = '{"downlinks":[{"f_port":' + str(DEVICE_PORT) + ',"frm_payload":"' + payload + '","priority":"' + PULISH_PRIORITY + '"}]}'
- result = mqtt_client.publish(topic, message, QOS)
- if result[0] == 0:
- print(F"Publish to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}")
- else:
- print(F"Failed to publish to topic: \"{topic}\", QoS: {QOS}")
- else:
- print("There are no device IDs configured for MQTT-Subscription!")
- raise ValueError
- def mqtt_on_connect_callback(client, userdata, flags, rc):
- if rc == 0:
- print(F"Connected successfully to MQTT broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"")
- else:
- raise ConnectionError(F"Could not connect to MQTT-Broker \"{PUPLIC_MQTT_TLS_ADDRESS}:{PUPLIC_MQTT_TLS_PORT}\"" +
- F"\nReturn-Code: {rc}")
- def mqtt_on_message_callback(client, userdata, message):
- print("\nMessage received on topic '" + message.topic + "' with QoS = " + str(message.qos))
- parsed_json = json.loads(message.payload)
- if DEBUG:
- print(f"Full Message (json):\n{json.dumps(parsed_json, indent=4)}")
- else:
- print("Message:")
- for key, val in parsed_json["end_device_ids"].items():
- if key == "device_id":
- print(F"\tDevice ID: {val}")
- if key == "application_ids":
- print(F"\tApplication ID: {val['application_id']}")
- for key, val in parsed_json["uplink_message"].items():
- if key == "f_port":
- print(F"\tPort: {val}")
- if key == "frm_payload":
- print(F"\tPayload: {val}")
- if key == "decoded_payload":
- print(F"\tDecoded Payload:")
- try:
- print(F"\t\tDate: {val['data'][0]['date']}")
- print(F"\t\tLabel: {val['data'][0]['label']}")
- print(F"\t\tValue: {val['data'][0]['value']}")
- except KeyError:
- print("\t\tCould not decode payload.")
- if key == "settings":
- print(F"\tFrequency: {val['frequency']}")
- print(F"\tBandwidth: {val['data_rate']['lora']['bandwidth']}")
- print(F"\tSpreading Factor: {val['data_rate']['lora']['spreading_factor']}")
- if key == "consumed_airtime":
- print(F"\tAirtime: {val}")
- def mqtt_on_subscribe_callback(client, userdata, mid, granted_qos):
- print(F"Subscription succesfull on message id: {mid}, and QoS: {granted_qos[0]}")
- def mqtt_on_disconnect_callback(client, userdata, rc):
- print(F"Disconnect from MQTT with result code: {rc}")
- def mqtt_on_publish_callback(client, userdata, mid):
- print(F"Publish succesfull on message ID: {mid}")
- def init_mqtt():
- mqtt_client = mqtt.Client(f"axxeo_monitoring-client")
- mqtt_client.on_connect = mqtt_on_connect_callback
- mqtt_client.on_message = mqtt_on_message_callback
- mqtt_client.on_subscribe = mqtt_on_subscribe_callback
- mqtt_client.on_disconnect = mqtt_on_disconnect_callback
- mqtt_client.on_publish = mqtt_on_publish_callback
- mqtt_client.username_pw_set(USER, PASSWORD)
- mqtt_client.tls_set()
- mqtt_client.connect(PUPLIC_MQTT_TLS_ADDRESS, PUPLIC_MQTT_TLS_PORT)
- if DEVICE_IDS:
- for id in DEVICE_IDS:
- topic = "v3/" + USER + "/devices/" + id + "/up"
- result = mqtt_client.subscribe(topic, QOS)
- if result[0] == 0:
- print(F"Subscribe to topic: \"{topic}\", QoS: {QOS}, message ID: {result[1]}")
- else:
- print(F"Failed to subsribe to topic: \"{topic}\", QoS: {QOS}")
- else:
- print("There are no device IDs configured for MQTT-Subscription!")
- raise ValueError
- payload = report_payload()
- mqtt_send(mqtt_client, payload)
- return mqtt_client
- def init_gpio():
- gpio_client = pigpio.pi()
- if not gpio_client.connected:
- print("The pigpio daemon is not running!")
- raise ConnectionError
- gpio_client.set_mode(GPIO, pigpio.INPUT)
- return gpio_client
- def main():
- try:
- print("Initialize GPIO")
- gpio_client = init_gpio()
- print("Initialize MQTT")
- mqtt_client = init_mqtt()
- print("\nStart Communication:")
- old_val = debounce(gpio_client)
- payload = set_payload(old_val)
- mqtt_send(mqtt_client, payload)
- while True:
- new_val = debounce(gpio_client)
- if old_val != new_val:
- old_val = new_val
- if old_val:
- print("\nSwitch on windpower plant!")
- else:
- print("\nSwitch off windpower plant!")
- payload = set_payload(new_val)
- mqtt_send(mqtt_client, payload)
- mqtt_client.loop(10)
- except KeyboardInterrupt:
- exit(0)
- except ConnectionError:
- exit(1)
- except socket.gaierror:
- print(F"\nNo address associated with hostname \"{PUPLIC_MQTT_TLS_ADDRESS}\"")
- exit(1)
- except ValueError:
- exit(1)
- finally:
- if "mqtt_client" in locals() and mqtt_client.is_connected():
- mqtt_client.disconnect()
- if "gpio_client" in locals() and gpio_client.connected:
- gpio_client.stop()
- print("\nExit program!")
- if __name__ == "__main__":
- main()
|