From ccc629b92b15c280b980c1e293c673107a733342 Mon Sep 17 00:00:00 2001 From: Martijn Braam Date: Sun, 14 Nov 2021 02:42:26 +0100 Subject: [PATCH] proxy: mqtt: make subscribe topic configurable The proxy now subscribes to the same topic as set for sending by default if allow-writes is enabled. To have a seperate subcription as the sending topic use the topic-subscribe setting. The protocol is switched to MQTTv5 to be able to use NoLocal which makes the broker not send the messages back to the sender when the sending and receiving topic is the same. --- docs/proxy.rst | 10 +++++++- openswitcher_proxy/frontend_mqtt.py | 40 +++++++++++++++++++---------- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/docs/proxy.rst b/docs/proxy.rst index 78537c6..98a9a81 100644 --- a/docs/proxy.rst +++ b/docs/proxy.rst @@ -172,7 +172,9 @@ The MQTT frontend will connect to an MQTT server and will sync the hardware stat connect = "localhost:1883" auth = false hardware = "mini" + allow-writes = false topic = "atem/{hardware}/{field}" + topic-subscribe = "atem/{hardware}/set/{field}" The proxy will send a new message the the configured MQTT topic every time the state of one of the connected switchers changes. There's also a special field generated called `status` that contains the connection state @@ -180,4 +182,10 @@ to the hardware on the other side of the proxy. The `topic` setting defines which topic the messages will go to. In this path you can use the `hardware` variable which will be the label of the switcher that generated the event and the `field` variable which will be the name of the -changed field in the same format as the `The HTTP API frontend`_ above. \ No newline at end of file +changed field in the same format as the `The HTTP API frontend`_ above. + +The `topic-subscribe` is an optional setting that sets the path the proxy will subscribe to the MQTT broker to receive +messages back. If this is not set the subscribe topic will be the same as the `topic` setting used to send messages. + +The `allow-writes` setting defaults to false. If this setting is changed to true it will make the proxy subscribe to +a topic and allow changing the switcher state my sending MQTT messages to that topic. \ No newline at end of file diff --git a/openswitcher_proxy/frontend_mqtt.py b/openswitcher_proxy/frontend_mqtt.py index 4da8847..96c8209 100644 --- a/openswitcher_proxy/frontend_mqtt.py +++ b/openswitcher_proxy/frontend_mqtt.py @@ -1,7 +1,9 @@ +import re import threading import logging import json from functools import partial +from json import JSONDecodeError from .error import DependencyError from .frontend_httpapi import FieldEncoder @@ -9,6 +11,7 @@ import pyatem.command as commandmodule try: import paho.mqtt.client as mqtt + from paho.mqtt.subscribeoptions import SubscribeOptions except ModuleNotFoundError: mqtt = None @@ -27,13 +30,18 @@ class MqttFrontendThread(threading.Thread): self.status = 'initializing...' self.error = None self.readonly = not self.config.get('allow-writes', False) + self.subscribe = self.config['topic-subscribe'] if 'topic-subscribe' in self.config else self.topic + + regex = self.subscribe.replace('{hardware}', r'(?P[^/]+)') + regex = regex.replace('{field}', r'(?P.+)') + self.topic_re = re.compile(regex) def run(self): logging.info('MQTT frontend run') host, port = self.config['host'].split(':') port = int(port) - self.client = mqtt.Client(client_id=f'atem-{self.name}', userdata=self) - self.client.on_connect = lambda client, userdata, flags, rc: self.on_mqtt_connect(flags, rc) + self.client = mqtt.Client(client_id=f'atem-{self.name}', userdata=self, protocol=mqtt.MQTTv5) + self.client.on_connect = lambda client, userdata, flags, rc, props: self.on_mqtt_connect(flags, rc, props) self.client.on_message = lambda client, userdata, msg: self.on_mqtt_message(msg) logging.info(f'connecting to {host}:{port}') try: @@ -73,33 +81,39 @@ class MqttFrontendThread(threading.Thread): def on_switcher_disconnected(self, hw): self.on_switcher_changed(hw, 'status', {'upstream': False}) - def on_mqtt_connect(self, flags, rc): + def on_mqtt_connect(self, flags, rc, properties): self.status = 'running' logging.info(f'MQTT: connected ({rc})') if not self.readonly: - self.client.subscribe(f'atem/+/set/#') + sub = self.subscribe.replace('{hardware}', '+').replace('{field}', '#') + logging.info(f'Subscribing to topic {sub}') + self.client.subscribe((sub, SubscribeOptions(noLocal=True))) def on_mqtt_message(self, msg): if self.readonly: logging.error('MQTT writes disabled') return - parts = msg.topic.split('/') - if len(parts) != 4: - logging.error(f'MQTT: malformed command topic: {msg.topic}') - return - hw = parts[1] - if parts[0] != 'atem' or parts[2] != 'set': + + match = self.topic_re.match(msg.topic) + if not match: logging.error(f'MQTT: malformed command topic: {msg.topic}') - return + + hw = match.group('hardware') + fieldname = match.group('field') + if hw not in self.hw_name: logging.error(f'MQTT: not handling writes for "{hw}"') return - fieldname = parts[3] + classname = fieldname.title().replace('-', '') + "Command" if not hasattr(commandmodule, classname): logging.error(f'MQTT: unrecognized command {fieldname}') return - arguments = json.loads(msg.payload) + try: + arguments = json.loads(msg.payload) + except JSONDecodeError as e: + logging.error('received malformed payload, need a JSON dict') + return if not isinstance(arguments, dict): logging.error(f'MQTT: mailformed payload, needs a JSON dict') return -- 2.34.2