diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 8c5dcae22..059913542 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -246,7 +246,7 @@ func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) { return } - if track.Delivered() { + if track.Delivered() && m.PersistentSession { msg.Ack() } @@ -261,9 +261,12 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { m.messagesRecv.Incr(1) metrics, err := m.parser.Parse(msg.Payload()) - if err != nil { - msg.Ack() + if err != nil || len(metrics) == 0 { + if m.PersistentSession { + msg.Ack() + } m.acc.AddError(err) + <-m.sem return } @@ -283,16 +286,22 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { if p.Tags != "" { err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric) if err != nil { - msg.Ack() + if m.PersistentSession { + msg.Ack() + } m.acc.AddError(err) + <-m.sem return } } if p.Fields != "" { err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric) if err != nil { - msg.Ack() + if m.PersistentSession { + msg.Ack() + } m.acc.AddError(err) + <-m.sem return } }