@@ 50,6 50,10 @@ regexp = "aquacomputer/(?P<name>[^/]+)/(?P<device>[^/]+)"
# Sets a prefix for a metric.
prefix = "aquacomputer"
+# Number of seconds of a prune interval in which data is considered stale
+# and removed from the metrics. Set this to 0 to never prune.
+pruneInterval = 120
+
# =========================================================================
# Configures the HTTP server
# =========================================================================
@@ 4,6 4,8 @@ import (
"encoding/json"
"fmt"
"regexp"
+ "sync"
+ "time"
"git.sr.ht/~sirn/aquacomputer-mqtt-exporter/internal/config"
"git.sr.ht/~sirn/aquacomputer-mqtt-exporter/internal/mqttclient"
@@ 12,25 14,33 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
+// AquacomputerMetric presents individual metrics.
+type AquacomputerMetric struct {
+ gauge prometheus.Gauge
+ dt time.Time
+}
+
// AquacomputerCollector implements prometheus.Collector.
type AquacomputerCollector struct {
+ sync.Mutex
+
config *config.Config
logger slogger.Logger
- metrics map[string]prometheus.Gauge
+ metrics map[string]*AquacomputerMetric
mqttclient *mqttclient.MQTTClient
}
// Describe implements collector.
func (c *AquacomputerCollector) Describe(ch chan<- *prometheus.Desc) {
for _, m := range c.metrics {
- ch <- m.Desc()
+ ch <- m.gauge.Desc()
}
}
// Collect implements collector.
func (c *AquacomputerCollector) Collect(ch chan<- prometheus.Metric) {
for _, m := range c.metrics {
- ch <- m
+ ch <- m.gauge
}
}
@@ 69,30 79,52 @@ func (c *AquacomputerCollector) NewCollectorFunc() func(mqtt.Client, mqtt.Messag
return
}
+ c.Lock()
+ defer c.Unlock()
+
for _, d := range payload.Data {
comp := fmt.Sprintf("%s_%s_%s", payload.Id, payload.Topic, d.Id)
if _, ok := c.metrics[comp]; !ok {
- c.metrics[comp] = prometheus.NewGauge(prometheus.GaugeOpts{
- Name: c.GenerateName(matched["name"]),
- ConstLabels: prometheus.Labels{
- "device": matched["device"],
- "name": d.Name,
- "sensor": d.Id,
- },
- })
+ c.metrics[comp] = &AquacomputerMetric{
+ gauge: prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: c.GenerateName(matched["name"]),
+ ConstLabels: prometheus.Labels{
+ "device": matched["device"],
+ "name": d.Name,
+ "sensor": d.Id,
+ },
+ }),
+ }
}
- c.metrics[comp].Set(d.Value)
+ c.metrics[comp].dt = time.Now()
+ c.metrics[comp].gauge.Set(d.Value)
}
}
}
+// SetupPruner sets up pruner.
+func (c *AquacomputerCollector) SetupPruner(interval time.Duration) {
+ for range time.Tick(interval * time.Second) {
+ c.Lock()
+ cutoff := time.Now().Add(-1 * interval * time.Second)
+ c.logger.Debugw("running metric pruner", "cutoff", cutoff)
+ for k, v := range c.metrics {
+ if v.dt.Before(cutoff) {
+ c.logger.Debugw("pruning old metric", "key", k)
+ delete(c.metrics, k)
+ }
+ }
+ c.Unlock()
+ }
+}
+
// NewCollector returns an instance of AquacomputerCollector.
func NewCollector(config *config.Config, logger slogger.Logger, mqttclient *mqttclient.MQTTClient) prometheus.Collector {
ac := &AquacomputerCollector{
config: config,
logger: logger,
- metrics: make(map[string]prometheus.Gauge),
+ metrics: make(map[string]*AquacomputerMetric),
mqttclient: mqttclient,
}
@@ 100,5 132,9 @@ func NewCollector(config *config.Config, logger slogger.Logger, mqttclient *mqtt
logger.Fatalf("could not subscribe to topic: %s", err)
}
+ if config.Metric.PruneInterval > 0 {
+ go ac.SetupPruner(time.Duration(config.Metric.PruneInterval))
+ }
+
return ac
}
@@ 29,8 29,9 @@ type HTTPConfig struct {
// MetricConfig represents metric configuration.
type MetricConfig struct {
- Regexp string
- Prefix string
+ Regexp string
+ Prefix string
+ PruneInterval int64
}
// Config represents application configuration.
@@ 53,6 54,7 @@ func NewViper(configPath string) (*viper.Viper, error) {
v.SetDefault("mqtt.topic", "aquacomputer/#")
v.SetDefault("metric.regexp", "aquacomputer/(?P<name>[^/]+)/(?P<device>[^/]+)")
v.SetDefault("metric.prefix", "aquacomputer")
+ v.SetDefault("metric.pruneInterval", 120)
v.SetDefault("http.listen", "0.0.0.0")
v.SetDefault("http.port", "9110")