fix(inputs.mqtt_consumer): rework connection and message tracking (#10696)

This commit is contained in:
Ted M Lin 2022-09-26 13:09:44 -04:00 committed by GitHub
parent df887b2711
commit 2b37d7e508
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 15 deletions

View File

@ -157,6 +157,10 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())
return m.connect()
}
func (m *MQTTConsumer) connect() error {
m.state = Connecting
m.client = m.clientFactory(m.opts)
// AddRoute sets up the function for handling messages. These need to be
// added in case we find a persistent session containing subscriptions so we
@ -165,10 +169,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
for _, topic := range m.Topics {
m.client.AddRoute(topic, m.recvMessage)
}
m.state = Connecting
return m.connect()
}
func (m *MQTTConsumer) connect() error {
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
err := token.Error()
@ -199,24 +199,27 @@ func (m *MQTTConsumer) connect() error {
return nil
}
func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
// Should already be disconnected, but make doubly sure
m.client.Disconnect(5)
m.acc.AddError(fmt.Errorf("connection lost: %v", err))
m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
}
func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
for {
// Drain anything that's been delivered
select {
case track := <-m.acc.Delivered():
<-m.sem
m.messagesMutex.Lock()
_, ok := m.messages[track.ID()]
if !ok {
// Added by a previous connection
continue
}
// No ack, MQTT does not support durable handling
delete(m.messages, track.ID())
m.messagesMutex.Unlock()
m.onDelivered(track)
continue
default:
}
// Wait for room to accumulate metric, but make delivery progress if possible
// (Note that select will randomly pick a case if both are available)
select {
case track := <-m.acc.Delivered():
m.onDelivered(track)
case m.sem <- empty{}:
err := m.onMessage(m.acc, msg)
if err != nil {
@ -243,6 +246,17 @@ func compareTopics(expected []string, incoming []string) bool {
return true
}
func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) {
<-m.sem
m.messagesMutex.Lock()
_, ok := m.messages[track.ID()]
if ok {
// No ack, MQTT does not support durable handling
delete(m.messages, track.ID())
}
m.messagesMutex.Unlock()
}
func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error {
payloadBytes := len(msg.Payload())
m.payloadSize.Incr(int64(payloadBytes))
@ -297,7 +311,6 @@ func (m *MQTTConsumer) Stop() {
}
func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if m.state == Disconnected {
m.state = Connecting
m.Log.Debugf("Connecting %v", m.Servers)
return m.connect()
}