fix(inputs.mqtt_consumer): Correctly handle semaphores on messages (#13478)

This commit is contained in:
Joshua Powers 2023-07-07 08:38:39 -06:00 committed by GitHub
parent bfc5a6a084
commit 341190d8a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 5 deletions

View File

@ -246,7 +246,7 @@ func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) {
return
}
if track.Delivered() {
if track.Delivered() && m.PersistentSession {
msg.Ack()
}
@ -261,9 +261,12 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
m.messagesRecv.Incr(1)
metrics, err := m.parser.Parse(msg.Payload())
if err != nil {
msg.Ack()
if err != nil || len(metrics) == 0 {
if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
}
@ -283,16 +286,22 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
if p.Tags != "" {
err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric)
if err != nil {
msg.Ack()
if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
}
}
if p.Fields != "" {
err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric)
if err != nil {
msg.Ack()
if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
}
}