ccc629b92b15c280b980c1e293c673107a733342 — Martijn Braam 2 months ago 88b359c
proxy: mqtt: make subscribe topic configurable

The proxy now subscribes to the same topic as set for sending by default
if allow-writes is enabled. To have a seperate subcription as the
sending topic use the topic-subscribe setting.

The protocol is switched to MQTTv5 to be able to use NoLocal which makes
the broker not send the messages back to the sender when the sending and
receiving topic is the same.
2 files changed, 36 insertions(+), 14 deletions(-)

M docs/proxy.rst
M openswitcher_proxy/frontend_mqtt.py
M docs/proxy.rst => docs/proxy.rst +9 -1
@@ 172,7 172,9 @@ The MQTT frontend will connect to an MQTT server and will sync the hardware stat
    connect = "localhost:1883"
    auth = false
    hardware = "mini"
    allow-writes = false
    topic = "atem/{hardware}/{field}"
    topic-subscribe = "atem/{hardware}/set/{field}"

The proxy will send a new message the the configured MQTT topic every time the state of one of the connected
switchers changes. There's also a special field generated called `status` that contains the connection state

@@ 180,4 182,10 @@ to the hardware on the other side of the proxy.

The `topic` setting defines which topic the messages will go to. In this path you can use the `hardware` variable which
will be the label of the switcher that generated the event and the `field` variable which will be the name of the
changed field in the same format as the `The HTTP API frontend`_ above.
\ No newline at end of file
changed field in the same format as the `The HTTP API frontend`_ above.

The `topic-subscribe` is an optional setting that sets the path the proxy will subscribe to the MQTT broker to receive
messages back. If this is not set the subscribe topic will be the same as the `topic` setting used to send messages.

The `allow-writes` setting defaults to false. If this setting is changed to true it will make the proxy subscribe to
a topic and allow changing the switcher state my sending MQTT messages to that topic.
\ No newline at end of file

M openswitcher_proxy/frontend_mqtt.py => openswitcher_proxy/frontend_mqtt.py +27 -13
@@ 1,7 1,9 @@
import re
import threading
import logging
import json
from functools import partial
from json import JSONDecodeError

from .error import DependencyError
from .frontend_httpapi import FieldEncoder

@@ 9,6 11,7 @@ import pyatem.command as commandmodule

    import paho.mqtt.client as mqtt
    from paho.mqtt.subscribeoptions import SubscribeOptions
except ModuleNotFoundError:
    mqtt = None

@@ 27,13 30,18 @@ class MqttFrontendThread(threading.Thread):
        self.status = 'initializing...'
        self.error = None
        self.readonly = not self.config.get('allow-writes', False)
        self.subscribe = self.config['topic-subscribe'] if 'topic-subscribe' in self.config else self.topic

        regex = self.subscribe.replace('{hardware}', r'(?P<hardware>[^/]+)')
        regex = regex.replace('{field}', r'(?P<field>.+)')
        self.topic_re = re.compile(regex)

    def run(self):
        logging.info('MQTT frontend run')
        host, port = self.config['host'].split(':')
        port = int(port)
        self.client = mqtt.Client(client_id=f'atem-{self.name}', userdata=self)
        self.client.on_connect = lambda client, userdata, flags, rc: self.on_mqtt_connect(flags, rc)
        self.client = mqtt.Client(client_id=f'atem-{self.name}', userdata=self, protocol=mqtt.MQTTv5)
        self.client.on_connect = lambda client, userdata, flags, rc, props: self.on_mqtt_connect(flags, rc, props)
        self.client.on_message = lambda client, userdata, msg: self.on_mqtt_message(msg)
        logging.info(f'connecting to {host}:{port}')

@@ 73,33 81,39 @@ class MqttFrontendThread(threading.Thread):
    def on_switcher_disconnected(self, hw):
        self.on_switcher_changed(hw, 'status', {'upstream': False})

    def on_mqtt_connect(self, flags, rc):
    def on_mqtt_connect(self, flags, rc, properties):
        self.status = 'running'
        logging.info(f'MQTT: connected ({rc})')
        if not self.readonly:
            sub = self.subscribe.replace('{hardware}', '+').replace('{field}', '#')
            logging.info(f'Subscribing to topic {sub}')
            self.client.subscribe((sub, SubscribeOptions(noLocal=True)))

    def on_mqtt_message(self, msg):
        if self.readonly:
            logging.error('MQTT writes disabled')
        parts = msg.topic.split('/')
        if len(parts) != 4:
            logging.error(f'MQTT: malformed command topic: {msg.topic}')
        hw = parts[1]
        if parts[0] != 'atem' or parts[2] != 'set':

        match = self.topic_re.match(msg.topic)
        if not match:
            logging.error(f'MQTT: malformed command topic: {msg.topic}')

        hw = match.group('hardware')
        fieldname = match.group('field')

        if hw not in self.hw_name:
            logging.error(f'MQTT: not handling writes for "{hw}"')
        fieldname = parts[3]

        classname = fieldname.title().replace('-', '') + "Command"
        if not hasattr(commandmodule, classname):
            logging.error(f'MQTT: unrecognized command {fieldname}')
        arguments = json.loads(msg.payload)
            arguments = json.loads(msg.payload)
        except JSONDecodeError as e:
            logging.error('received malformed payload, need a JSON dict')
        if not isinstance(arguments, dict):
            logging.error(f'MQTT: mailformed payload, needs a JSON dict')