From d2dc828df01451fe3677dd31765ebe8fac3d4b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Kundr=C3=A1t?= Date: Sun, 14 Nov 2021 00:56:13 +0100 Subject: [PATCH] proxy: mqtt: allow arbitrary writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since there are typically no server-side MQTT ACLs, this has to be enabled explicitly. There's some duplication of code between this implementation and the one in the HTTP backend. The most straightforward way of fixing that is to convert these early returns into proper exceptions, but I'm too lazy for this and I'm gonna be using this feature tomorrow :). Signed-off-by: Jan Kundrát --- openswitcher_proxy/frontend_mqtt.py | 43 ++++++++++++++++++++++++++--- proxy.toml | 1 + 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/openswitcher_proxy/frontend_mqtt.py b/openswitcher_proxy/frontend_mqtt.py index 63538c6..ff0b23a 100644 --- a/openswitcher_proxy/frontend_mqtt.py +++ b/openswitcher_proxy/frontend_mqtt.py @@ -5,6 +5,7 @@ from functools import partial from .error import DependencyError from .frontend_httpapi import FieldEncoder +import pyatem.command as commandmodule try: import paho.mqtt.client as mqtt @@ -74,12 +75,46 @@ class MqttFrontendThread(threading.Thread): def on_mqtt_connect(self, flags, rc): self.status = 'running' logging.info(f'MQTT: connected ({rc})') - # TODO: enable once on_mqtt_message() works - # client.subscribe(f'atem/{userdata.hw_name}/#') + self.client.subscribe(f'atem/+/set/#') def on_mqtt_message(self, msg): - # TODO: propagate to the switcher, eventually - logging.debug(f'MQTT: msg: {msg.topic} {msg.payload}') + if not self.config.get('allow-writes', False): + logging.error('MQTT writes disabled') + return + parts = msg.topic.split('/') + if len(parts) != 4: + logging.error(f'MQTT: malformed command topic: {msg.topic}') + return + hw = parts[1] + if parts[0] != 'atem' or parts[2] != 'set': + logging.error(f'MQTT: malformed command topic: {msg.topic}') + return + if hw not in self.hw_name: + logging.error(f'MQTT: not handling writes for "{hw}"') + return + fieldname = parts[3] + classname = fieldname.title().replace('-', '') + "Command" + if not hasattr(commandmodule, classname): + logging.error(f'MQTT: unrecognized command {fieldname}') + return + arguments = json.loads(msg.payload) + if not isinstance(arguments, dict): + logging.error(f'MQTT: mailformed payload, needs a JSON dict') + return + for key in arguments: + try: + arguments[key] = int(arguments[key]) + except: + pass + if 'source' in arguments: + inputs = self.threadlist['hardware'][hw].switcher.inputs + if arguments['source'] in inputs: + arguments['source'] = inputs[arguments['source']] + try: + cmd = getattr(commandmodule, classname)(**arguments) + self.threadlist['hardware'][hw].switcher.send_commands([cmd]) + except Exception as e: + logging.error(f'MQTT: cannot write {fieldname}: {str(e)}') def get_status(self): if self.status == 'error': diff --git a/proxy.toml b/proxy.toml index bd6b213..1e55bf5 100644 --- a/proxy.toml +++ b/proxy.toml @@ -36,3 +36,4 @@ type = "mqtt" host = "localhost:1883" hardware = "mini" auth = false +allow-writes = false -- 2.34.2