M cmd/aquacomputer-mqtt-exporter/main.go => cmd/aquacomputer-mqtt-exporter/main.go +0 -1
@@ 40,7 40,6 @@ var rootCmd = &cobra.Command{
if err != nil {
slogger.Fatalf("could not connect to mqtt broker: %s", err)
}
- logger.Infof("connected to mqtt broker at %s", mqttclient.GetAddress())
registry := prometheus.NewRegistry()
registry.MustRegister(collectors.NewBuildInfoCollector())
M internal/aquacomputer/collector.go => internal/aquacomputer/collector.go +3 -2
@@ 128,8 128,9 @@ func NewCollector(config *config.Config, logger slogger.Logger, mqttclient *mqtt
mqttclient: mqttclient,
}
- if err := mqttclient.Subscribe(config.MQTT.Topic, ac.newCollectorFunc()); err != nil {
- logger.Fatalf("could not subscribe to topic: %s", err)
+ mqttclient.SetTopicHandler(config.MQTT.Topic, ac.newCollectorFunc())
+ if err := mqttclient.Connect(); err != nil {
+ logger.Fatalf("could not connect: %s", err)
}
if config.Metric.PruneInterval > 0 {
M internal/mqttclient/mqttclient.go => internal/mqttclient/mqttclient.go +35 -9
@@ 10,6 10,7 @@ type MQTTClient struct {
config *config.Config
client mqtt.Client
logger slogger.Logger
+ topics map[string]mqtt.MessageHandler
}
// NewMQTTClient creates a new MQTTClient.
@@ 17,6 18,7 @@ func NewMQTTClient(config *config.Config, logger slogger.Logger) (*MQTTClient, e
mqttclient := &MQTTClient{
config: config,
logger: logger,
+ topics: map[string]mqtt.MessageHandler{},
}
mqttOptions := mqtt.NewClientOptions()
@@ 24,13 26,11 @@ func NewMQTTClient(config *config.Config, logger slogger.Logger) (*MQTTClient, e
mqttOptions.SetAutoReconnect(true)
mqttOptions.SetUsername(config.MQTT.Username)
mqttOptions.SetPassword(config.MQTT.Password)
+ mqttOptions.SetOnConnectHandler(mqttclient.ConnectionHandler)
+ mqttOptions.SetConnectionLostHandler(mqttclient.ConnectionLostHandler)
+ mqttOptions.SetReconnectingHandler(mqttclient.ReconnectingHandler)
- client := mqtt.NewClient(mqttOptions)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- return nil, token.Error()
- }
-
- mqttclient.client = client
+ mqttclient.client = mqtt.NewClient(mqttOptions)
return mqttclient, nil
}
@@ 39,11 39,37 @@ func (m *MQTTClient) GetAddress() string {
return m.config.MQTT.Broker
}
-// Subscribe subscribes fn to the given topic.
-func (m *MQTTClient) Subscribe(topic string, fn func(mqtt.Client, mqtt.Message)) error {
- if token := m.client.Subscribe(topic, 0, fn); token.Wait() && token.Error() != nil {
+func (m *MQTTClient) Connect() error {
+ if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
+ m.logger.Infof("connected to mqtt broker at %s", m.GetAddress())
return nil
}
+
+// SetTopicHandler sets handler for the given topic.
+func (m *MQTTClient) SetTopicHandler(topic string, fn mqtt.MessageHandler) {
+ m.topics[topic] = fn
+}
+
+// ConnectionHandler handles MQTT connection callback.
+func (m *MQTTClient) ConnectionHandler(client mqtt.Client) {
+ for topic, fn := range m.topics {
+ m.logger.Infof("subscribing to %s", topic)
+ if token := m.client.Subscribe(topic, 0, fn); token.Wait() && token.Error() != nil {
+ m.logger.Errorf("could not subscribe to %s", topic)
+ continue
+ }
+ }
+}
+
+// ConnectionLostHandler handles MQTT connection lost callback.
+func (m *MQTTClient) ConnectionLostHandler(client mqtt.Client, err error) {
+ m.logger.Warnf("connection lost: %s", err.Error())
+}
+
+// ReconnectionHandler handles MQTT reconnecting callback.
+func (m *MQTTClient) ReconnectingHandler(client mqtt.Client, clientOptions *mqtt.ClientOptions) {
+ m.logger.Infof("reconnecting to %s", m.GetAddress())
+}