From 8c8f88f882d0b795ab1605a2969b819e756ac6e7 Mon Sep 17 00:00:00 2001 From: Sirn Thanabulpong Date: Sun, 22 Sep 2024 19:08:04 +0900 Subject: [PATCH] Properly handle reconnection --- cmd/aquacomputer-mqtt-exporter/main.go | 1 - internal/aquacomputer/collector.go | 5 +-- internal/mqttclient/mqttclient.go | 44 ++++++++++++++++++++------ 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/cmd/aquacomputer-mqtt-exporter/main.go b/cmd/aquacomputer-mqtt-exporter/main.go index 1054ed1..7698b75 100644 --- a/cmd/aquacomputer-mqtt-exporter/main.go +++ b/cmd/aquacomputer-mqtt-exporter/main.go @@ -40,7 +40,6 @@ var rootCmd = &cobra.Command{ if err != nil { slogger.Fatalf("could not connect to mqtt broker: %s", err) } - logger.Infof("connected to mqtt broker at %s", mqttclient.GetAddress()) registry := prometheus.NewRegistry() registry.MustRegister(collectors.NewBuildInfoCollector()) diff --git a/internal/aquacomputer/collector.go b/internal/aquacomputer/collector.go index 0d00757..0846860 100644 --- a/internal/aquacomputer/collector.go +++ b/internal/aquacomputer/collector.go @@ -128,8 +128,9 @@ func NewCollector(config *config.Config, logger slogger.Logger, mqttclient *mqtt mqttclient: mqttclient, } - if err := mqttclient.Subscribe(config.MQTT.Topic, ac.newCollectorFunc()); err != nil { - logger.Fatalf("could not subscribe to topic: %s", err) + mqttclient.SetTopicHandler(config.MQTT.Topic, ac.newCollectorFunc()) + if err := mqttclient.Connect(); err != nil { + logger.Fatalf("could not connect: %s", err) } if config.Metric.PruneInterval > 0 { diff --git a/internal/mqttclient/mqttclient.go b/internal/mqttclient/mqttclient.go index efd8e7e..3e9110e 100644 --- a/internal/mqttclient/mqttclient.go +++ b/internal/mqttclient/mqttclient.go @@ -10,6 +10,7 @@ type MQTTClient struct { config *config.Config client mqtt.Client logger slogger.Logger + topics map[string]mqtt.MessageHandler } // NewMQTTClient creates a new MQTTClient. @@ -17,6 +18,7 @@ func NewMQTTClient(config *config.Config, logger slogger.Logger) (*MQTTClient, e mqttclient := &MQTTClient{ config: config, logger: logger, + topics: map[string]mqtt.MessageHandler{}, } mqttOptions := mqtt.NewClientOptions() @@ -24,13 +26,11 @@ func NewMQTTClient(config *config.Config, logger slogger.Logger) (*MQTTClient, e mqttOptions.SetAutoReconnect(true) mqttOptions.SetUsername(config.MQTT.Username) mqttOptions.SetPassword(config.MQTT.Password) + mqttOptions.SetOnConnectHandler(mqttclient.ConnectionHandler) + mqttOptions.SetConnectionLostHandler(mqttclient.ConnectionLostHandler) + mqttOptions.SetReconnectingHandler(mqttclient.ReconnectingHandler) - client := mqtt.NewClient(mqttOptions) - if token := client.Connect(); token.Wait() && token.Error() != nil { - return nil, token.Error() - } - - mqttclient.client = client + mqttclient.client = mqtt.NewClient(mqttOptions) return mqttclient, nil } @@ -39,11 +39,37 @@ func (m *MQTTClient) GetAddress() string { return m.config.MQTT.Broker } -// Subscribe subscribes fn to the given topic. -func (m *MQTTClient) Subscribe(topic string, fn func(mqtt.Client, mqtt.Message)) error { - if token := m.client.Subscribe(topic, 0, fn); token.Wait() && token.Error() != nil { +func (m *MQTTClient) Connect() error { + if token := m.client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } + m.logger.Infof("connected to mqtt broker at %s", m.GetAddress()) return nil } + +// SetTopicHandler sets handler for the given topic. +func (m *MQTTClient) SetTopicHandler(topic string, fn mqtt.MessageHandler) { + m.topics[topic] = fn +} + +// ConnectionHandler handles MQTT connection callback. +func (m *MQTTClient) ConnectionHandler(client mqtt.Client) { + for topic, fn := range m.topics { + m.logger.Infof("subscribing to %s", topic) + if token := m.client.Subscribe(topic, 0, fn); token.Wait() && token.Error() != nil { + m.logger.Errorf("could not subscribe to %s", topic) + continue + } + } +} + +// ConnectionLostHandler handles MQTT connection lost callback. +func (m *MQTTClient) ConnectionLostHandler(client mqtt.Client, err error) { + m.logger.Warnf("connection lost: %s", err.Error()) +} + +// ReconnectionHandler handles MQTT reconnecting callback. +func (m *MQTTClient) ReconnectingHandler(client mqtt.Client, clientOptions *mqtt.ClientOptions) { + m.logger.Infof("reconnecting to %s", m.GetAddress()) +} -- 2.45.2