fix: add mutex to gnmi lookup map (#11008)
This commit is contained in:
parent
da31a19d5b
commit
72997edf7b
|
|
@ -57,7 +57,8 @@ type GNMI struct {
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
// Lookup/device+name/key/value
|
// Lookup/device+name/key/value
|
||||||
lookup map[string]map[string]map[string]interface{}
|
lookup map[string]map[string]map[string]interface{}
|
||||||
|
lookupMutex sync.Mutex
|
||||||
|
|
||||||
Log telegraf.Logger
|
Log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
@ -88,7 +89,9 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
|
||||||
var request *gnmiLib.SubscribeRequest
|
var request *gnmiLib.SubscribeRequest
|
||||||
c.acc = acc
|
c.acc = acc
|
||||||
ctx, c.cancel = context.WithCancel(context.Background())
|
ctx, c.cancel = context.WithCancel(context.Background())
|
||||||
|
c.lookupMutex.Lock()
|
||||||
c.lookup = make(map[string]map[string]map[string]interface{})
|
c.lookup = make(map[string]map[string]map[string]interface{})
|
||||||
|
c.lookupMutex.Unlock()
|
||||||
|
|
||||||
// Validate configuration
|
// Validate configuration
|
||||||
if request, err = c.newSubscribeRequest(); err != nil {
|
if request, err = c.newSubscribeRequest(); err != nil {
|
||||||
|
|
@ -142,7 +145,9 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
if subscription.TagOnly {
|
if subscription.TagOnly {
|
||||||
// Create the top-level lookup for this tag
|
// Create the top-level lookup for this tag
|
||||||
|
c.lookupMutex.Lock()
|
||||||
c.lookup[name] = make(map[string]map[string]interface{})
|
c.lookup[name] = make(map[string]map[string]interface{})
|
||||||
|
c.lookupMutex.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for alias, encodingPath := range c.Aliases {
|
for alias, encodingPath := range c.Aliases {
|
||||||
|
|
@ -310,6 +315,7 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
|
||||||
|
|
||||||
// Update tag lookups and discard rest of update
|
// Update tag lookups and discard rest of update
|
||||||
subscriptionKey := tags["source"] + "/" + tags["name"]
|
subscriptionKey := tags["source"] + "/" + tags["name"]
|
||||||
|
c.lookupMutex.Lock()
|
||||||
if _, ok := c.lookup[name]; ok {
|
if _, ok := c.lookup[name]; ok {
|
||||||
// We are subscribed to this, so add the fields to the lookup-table
|
// We are subscribed to this, so add the fields to the lookup-table
|
||||||
if _, ok := c.lookup[name][subscriptionKey]; !ok {
|
if _, ok := c.lookup[name][subscriptionKey]; !ok {
|
||||||
|
|
@ -318,6 +324,7 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
|
||||||
for k, v := range fields {
|
for k, v := range fields {
|
||||||
c.lookup[name][subscriptionKey][path.Base(k)] = v
|
c.lookup[name][subscriptionKey][path.Base(k)] = v
|
||||||
}
|
}
|
||||||
|
c.lookupMutex.Unlock()
|
||||||
// Do not process the data further as we only subscribed here for the lookup table
|
// Do not process the data further as we only subscribed here for the lookup table
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -330,6 +337,7 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
c.lookupMutex.Unlock()
|
||||||
|
|
||||||
// Group metrics
|
// Group metrics
|
||||||
for k, v := range fields {
|
for k, v := range fields {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue