From 2b37d7e5086497d3eafbbf12c442b9337518c31a Mon Sep 17 00:00:00 2001 From: Ted M Lin Date: Mon, 26 Sep 2022 13:09:44 -0400 Subject: [PATCH] fix(inputs.mqtt_consumer): rework connection and message tracking (#10696) --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index b41fa31b4..859f12fd4 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -157,6 +157,10 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.acc = acc.WithTracking(m.MaxUndeliveredMessages) m.sem = make(semaphore, m.MaxUndeliveredMessages) m.ctx, m.cancel = context.WithCancel(context.Background()) + return m.connect() +} +func (m *MQTTConsumer) connect() error { + m.state = Connecting m.client = m.clientFactory(m.opts) // AddRoute sets up the function for handling messages. These need to be // added in case we find a persistent session containing subscriptions so we @@ -165,10 +169,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { for _, topic := range m.Topics { m.client.AddRoute(topic, m.recvMessage) } - m.state = Connecting - return m.connect() -} -func (m *MQTTConsumer) connect() error { token := m.client.Connect() if token.Wait() && token.Error() != nil { err := token.Error() @@ -199,24 +199,27 @@ func (m *MQTTConsumer) connect() error { return nil } func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) { + // Should already be disconnected, but make doubly sure + m.client.Disconnect(5) m.acc.AddError(fmt.Errorf("connection lost: %v", err)) m.Log.Debugf("Disconnected %v", m.Servers) m.state = Disconnected } func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) { for { + // Drain anything that's been delivered select { case track := <-m.acc.Delivered(): - <-m.sem - m.messagesMutex.Lock() - _, ok := m.messages[track.ID()] - if !ok { - // Added by a previous connection - continue - } - // No ack, MQTT does not support durable handling - delete(m.messages, track.ID()) - m.messagesMutex.Unlock() + m.onDelivered(track) + continue + default: + } + + // Wait for room to accumulate metric, but make delivery progress if possible + // (Note that select will randomly pick a case if both are available) + select { + case track := <-m.acc.Delivered(): + m.onDelivered(track) case m.sem <- empty{}: err := m.onMessage(m.acc, msg) if err != nil { @@ -243,6 +246,17 @@ func compareTopics(expected []string, incoming []string) bool { return true } +func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) { + <-m.sem + m.messagesMutex.Lock() + _, ok := m.messages[track.ID()] + if ok { + // No ack, MQTT does not support durable handling + delete(m.messages, track.ID()) + } + m.messagesMutex.Unlock() +} + func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error { payloadBytes := len(msg.Payload()) m.payloadSize.Incr(int64(payloadBytes)) @@ -297,7 +311,6 @@ func (m *MQTTConsumer) Stop() { } func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error { if m.state == Disconnected { - m.state = Connecting m.Log.Debugf("Connecting %v", m.Servers) return m.connect() }