From 341190d8a4f191ec07b211ce008f5f45623c5d65 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Fri, 7 Jul 2023 08:38:39 -0600 Subject: [PATCH] fix(inputs.mqtt_consumer): Correctly handle semaphores on messages (#13478) --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 8c5dcae22..059913542 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -246,7 +246,7 @@ func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) { return } - if track.Delivered() { + if track.Delivered() && m.PersistentSession { msg.Ack() } @@ -261,9 +261,12 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { m.messagesRecv.Incr(1) metrics, err := m.parser.Parse(msg.Payload()) - if err != nil { - msg.Ack() + if err != nil || len(metrics) == 0 { + if m.PersistentSession { + msg.Ack() + } m.acc.AddError(err) + <-m.sem return } @@ -283,16 +286,22 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { if p.Tags != "" { err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric) if err != nil { - msg.Ack() + if m.PersistentSession { + msg.Ack() + } m.acc.AddError(err) + <-m.sem return } } if p.Fields != "" { err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric) if err != nil { - msg.Ack() + if m.PersistentSession { + msg.Ack() + } m.acc.AddError(err) + <-m.sem return } }