fix(inputs.mqtt): ACK messages when persistence enabled (#13350)

This commit is contained in:
Joshua Powers 2023-06-07 12:50:00 -06:00 committed by GitHub
parent db84ef4667
commit ebe346103e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 41 deletions

View File

@ -84,13 +84,14 @@ type MQTTConsumer struct {
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool
messages map[telegraf.TrackingID]mqtt.Message
messagesMutex sync.Mutex
topicTagParse string
ctx context.Context
cancel context.CancelFunc
payloadSize selfstat.Stat
messagesRecv selfstat.Stat
wg sync.WaitGroup
}
func (*MQTTConsumer) SampleConfig() string {
@ -120,7 +121,7 @@ func (m *MQTTConsumer) Init() error {
return err
}
m.opts = opts
m.messages = map[telegraf.TrackingID]bool{}
m.messages = map[telegraf.TrackingID]mqtt.Message{}
for i, p := range m.TopicParsing {
splitMeasurement := strings.Split(p.Measurement, "/")
@ -156,6 +157,20 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())
m.wg.Add(1)
go func() {
defer m.wg.Done()
for {
select {
case <-m.ctx.Done():
return
case track := <-m.acc.Delivered():
m.onDelivered(track)
}
}
}()
return m.connect()
}
func (m *MQTTConsumer) connect() error {
@ -166,7 +181,7 @@ func (m *MQTTConsumer) connect() error {
// know where to dispatch persisted and new messages to. In the alternate
// case that we need to create the subscriptions these will be replaced.
for _, topic := range m.Topics {
m.client.AddRoute(topic, m.recvMessage)
m.client.AddRoute(topic, m.onMessage)
}
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
@ -189,7 +204,7 @@ func (m *MQTTConsumer) connect() error {
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}
subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken := m.client.SubscribeMultiple(topics, m.onMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
m.acc.AddError(fmt.Errorf("subscription error: topics %q: %w", strings.Join(m.Topics[:], ","), subscribeToken.Error()))
@ -203,31 +218,6 @@ func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
}
func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
for {
// Drain anything that's been delivered
select {
case track := <-m.acc.Delivered():
m.onDelivered(track)
continue
default:
}
// Wait for room to accumulate metric, but make delivery progress if possible
// (Note that select will randomly pick a case if both are available)
select {
case track := <-m.acc.Delivered():
m.onDelivered(track)
case m.sem <- empty{}:
err := m.onMessage(m.acc, msg)
if err != nil {
m.acc.AddError(err)
<-m.sem
}
return
}
}
}
// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value
func compareTopics(expected []string, incoming []string) bool {
@ -246,23 +236,35 @@ func compareTopics(expected []string, incoming []string) bool {
func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) {
<-m.sem
m.messagesMutex.Lock()
_, ok := m.messages[track.ID()]
if ok {
// No ack, MQTT does not support durable handling
delete(m.messages, track.ID())
defer m.messagesMutex.Unlock()
msg, ok := m.messages[track.ID()]
if !ok {
m.Log.Errorf("could not mark message delivered: %d", track.ID())
return
}
m.messagesMutex.Unlock()
if track.Delivered() {
msg.Ack()
}
delete(m.messages, track.ID())
}
func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error {
func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
m.sem <- empty{}
payloadBytes := len(msg.Payload())
m.payloadSize.Incr(int64(payloadBytes))
m.messagesRecv.Incr(1)
metrics, err := m.parser.Parse(msg.Payload())
if err != nil {
return err
msg.Ack()
m.acc.AddError(err)
return
}
for _, metric := range metrics {
@ -281,22 +283,25 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess
if p.Tags != "" {
err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric)
if err != nil {
return err
msg.Ack()
m.acc.AddError(err)
return
}
}
if p.Fields != "" {
err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric)
if err != nil {
return err
msg.Ack()
m.acc.AddError(err)
return
}
}
}
}
id := acc.AddTrackingMetricGroup(metrics)
id := m.acc.AddTrackingMetricGroup(metrics)
m.messagesMutex.Lock()
m.messages[id] = true
m.messages[id] = msg
m.messagesMutex.Unlock()
return nil
}
func (m *MQTTConsumer) Stop() {
if m.state == Connected {
@ -368,6 +373,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts.SetAutoReconnect(false)
opts.SetKeepAlive(time.Second * 60)
opts.SetCleanSession(!m.PersistentSession)
opts.SetAutoAckDisabled(m.PersistentSession)
opts.SetConnectionLostHandler(m.onConnectionLost)
return opts, nil
}