fix issue with mqtt concurrent map write (#8562)
This commit is contained in:
parent
b858eb962a
commit
e39208d60a
|
|
@ -5,9 +5,10 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
|
@ -69,6 +70,7 @@ type MQTTConsumer struct {
|
||||||
state ConnectionState
|
state ConnectionState
|
||||||
sem semaphore
|
sem semaphore
|
||||||
messages map[telegraf.TrackingID]bool
|
messages map[telegraf.TrackingID]bool
|
||||||
|
messagesMutex sync.Mutex
|
||||||
topicTag string
|
topicTag string
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
@ -219,7 +221,9 @@ func (m *MQTTConsumer) connect() error {
|
||||||
|
|
||||||
m.Log.Infof("Connected %v", m.Servers)
|
m.Log.Infof("Connected %v", m.Servers)
|
||||||
m.state = Connected
|
m.state = Connected
|
||||||
|
m.messagesMutex.Lock()
|
||||||
m.messages = make(map[telegraf.TrackingID]bool)
|
m.messages = make(map[telegraf.TrackingID]bool)
|
||||||
|
m.messagesMutex.Unlock()
|
||||||
|
|
||||||
// Persistent sessions should skip subscription if a session is present, as
|
// Persistent sessions should skip subscription if a session is present, as
|
||||||
// the subscriptions are stored by the server.
|
// the subscriptions are stored by the server.
|
||||||
|
|
@ -258,6 +262,7 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) {
|
||||||
select {
|
select {
|
||||||
case track := <-m.acc.Delivered():
|
case track := <-m.acc.Delivered():
|
||||||
<-m.sem
|
<-m.sem
|
||||||
|
m.messagesMutex.Lock()
|
||||||
_, ok := m.messages[track.ID()]
|
_, ok := m.messages[track.ID()]
|
||||||
if !ok {
|
if !ok {
|
||||||
// Added by a previous connection
|
// Added by a previous connection
|
||||||
|
|
@ -265,6 +270,7 @@ func (m *MQTTConsumer) recvMessage(c mqtt.Client, msg mqtt.Message) {
|
||||||
}
|
}
|
||||||
// No ack, MQTT does not support durable handling
|
// No ack, MQTT does not support durable handling
|
||||||
delete(m.messages, track.ID())
|
delete(m.messages, track.ID())
|
||||||
|
m.messagesMutex.Unlock()
|
||||||
case m.sem <- empty{}:
|
case m.sem <- empty{}:
|
||||||
err := m.onMessage(m.acc, msg)
|
err := m.onMessage(m.acc, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -290,7 +296,9 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess
|
||||||
}
|
}
|
||||||
|
|
||||||
id := acc.AddTrackingMetricGroup(metrics)
|
id := acc.AddTrackingMetricGroup(metrics)
|
||||||
|
m.messagesMutex.Lock()
|
||||||
m.messages[id] = true
|
m.messages[id] = true
|
||||||
|
m.messagesMutex.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue