fix(gnmi): refactor tag-only subs for complex keys (#11011)

This commit is contained in:
bewing 2022-07-07 13:50:40 -05:00 committed by GitHub
parent 5b238cb21c
commit f29f7b28f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 734 additions and 162 deletions

View File

@ -58,7 +58,7 @@ It has been optimized to support gNMI telemetry as produced by Cisco IOS XR
## origin usually refers to a (YANG) data model implemented by the device
## and path to a specific substructure inside it that should be subscribed to (similar to an XPath)
## YANG models can be found e.g. here: https://github.com/YangModels/yang/tree/master/vendor/cisco/xr
origin = "openconfig-interfaces"
origin = "openconfig"
path = "/interfaces/interface/state/counters"
# Subscription mode (one of: "target_defined", "sample", "on_change") and interval
@ -71,17 +71,19 @@ It has been optimized to support gNMI telemetry as produced by Cisco IOS XR
## If suppression is enabled, send updates at least every X seconds anyway
# heartbeat_interval = "60s"
#[[inputs.gnmi.subscription]]
# name = "descr"
# origin = "openconfig-interfaces"
# path = "/interfaces/interface/state/description"
# subscription_mode = "on_change"
## Tag subscriptions are subscriptions to paths intended to be applied as tags to other subscriptions
[[inputs.gnmi.tag_subscription]]
# When applying this value as a tag to other metrics, use this tag name
name = "descr"
# All other subscription fields are as normal
origin = "openconfig"
path = "/interfaces/interface/state/description"
subscription_mode = "on_change"
# At least one path element name must be supplied that contains at least one key to match on
# Multiple element names can be specified in any order - all element names must be present and contain
# to be stored as an in-memory tag
elements = ["interface"]
## If tag_only is set, the subscription in question will be utilized to maintain a map of
## tags to apply to other measurements emitted by the plugin, by matching path keys
## All fields from the tag-only subscription will be applied as tags to other readings,
## in the format <name>_<fieldBase>.
# tag_only = true
```
## Example Output

View File

@ -36,9 +36,10 @@ var sampleConfig string
// gNMI plugin instance
type GNMI struct {
Addresses []string `toml:"addresses"`
Subscriptions []Subscription `toml:"subscription"`
Aliases map[string]string `toml:"aliases"`
Addresses []string `toml:"addresses"`
Subscriptions []Subscription `toml:"subscription"`
TagSubscriptions []TagSubscription `toml:"tag_subscription"`
Aliases map[string]string `toml:"aliases"`
// Optional subscription configuration
Encoding string
@ -63,19 +64,36 @@ type GNMI struct {
acc telegraf.Accumulator
cancel context.CancelFunc
wg sync.WaitGroup
// Lookup/device+name/key/value
lookup map[string]map[string]map[string]interface{}
lookupMutex sync.Mutex
legacyTags bool
Log telegraf.Logger
}
type Worker struct {
address string
tagStore *tagNode
}
type tagNode struct {
elem *gnmiLib.PathElem
tagName string
value *gnmiLib.TypedValue
tagStore map[string][]*tagNode
}
type tagResults struct {
names []string
values []*gnmiLib.TypedValue
}
// Subscription for a gNMI client
type Subscription struct {
Name string
Origin string
Path string
fullPath *gnmiLib.Path
// Subscription mode and interval
SubscriptionMode string `toml:"subscription_mode"`
SampleInterval config.Duration `toml:"sample_interval"`
@ -88,6 +106,12 @@ type Subscription struct {
TagOnly bool `toml:"tag_only"`
}
// Tag Subscription for a gNMI client
type TagSubscription struct {
Subscription
Elements []string
}
func (*GNMI) SampleConfig() string {
return sampleConfig
}
@ -100,9 +124,33 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
var request *gnmiLib.SubscribeRequest
c.acc = acc
ctx, c.cancel = context.WithCancel(context.Background())
c.lookupMutex.Lock()
c.lookup = make(map[string]map[string]map[string]interface{})
c.lookupMutex.Unlock()
for i := len(c.Subscriptions) - 1; i >= 0; i-- {
subscription := c.Subscriptions[i]
// Support legacy TagOnly subscriptions
if subscription.TagOnly {
tagSub := convertTagOnlySubscription(subscription)
c.TagSubscriptions = append(c.TagSubscriptions, tagSub)
// Remove from the original subscriptions list
c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...)
c.legacyTags = true
continue
}
if err = subscription.buildFullPath(c); err != nil {
return err
}
}
for idx := range c.TagSubscriptions {
if err = c.TagSubscriptions[idx].buildFullPath(c); err != nil {
return err
}
if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly {
return fmt.Errorf("do not mix legacy tag_only subscriptions and tag subscriptions")
}
if len(c.TagSubscriptions[idx].Elements) == 0 {
return fmt.Errorf("tag_subscription must have at least one element")
}
}
// Validate configuration
if request, err = c.newSubscribeRequest(); err != nil {
@ -123,44 +171,19 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
}
// Invert explicit alias list and prefill subscription names
c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases))
for _, subscription := range c.Subscriptions {
var gnmiLongPath, gnmiShortPath *gnmiLib.Path
c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions))
// Build the subscription path without keys
if gnmiLongPath, err = parsePath(subscription.Origin, subscription.Path, ""); err != nil {
for _, s := range c.Subscriptions {
if err := s.buildAlias(c.internalAliases); err != nil {
return err
}
if gnmiShortPath, err = parsePath("", subscription.Path, ""); err != nil {
return err
}
longPath, _, err := c.handlePath(gnmiLongPath, nil, "")
if err != nil {
return fmt.Errorf("handling long-path failed: %v", err)
}
shortPath, _, err := c.handlePath(gnmiShortPath, nil, "")
if err != nil {
return fmt.Errorf("handling short-path failed: %v", err)
}
name := subscription.Name
// If the user didn't provide a measurement name, use last path element
if len(name) == 0 {
name = path.Base(shortPath)
}
if len(name) > 0 {
c.internalAliases[longPath] = name
c.internalAliases[shortPath] = name
}
if subscription.TagOnly {
// Create the top-level lookup for this tag
c.lookupMutex.Lock()
c.lookup[name] = make(map[string]map[string]interface{})
c.lookupMutex.Unlock()
}
}
for _, s := range c.TagSubscriptions {
if err := s.buildAlias(c.internalAliases); err != nil {
return err
}
}
for alias, encodingPath := range c.Aliases {
c.internalAliases[encodingPath] = alias
}
@ -168,10 +191,12 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
// Create a goroutine for each device, dial and subscribe
c.wg.Add(len(c.Addresses))
for _, addr := range c.Addresses {
go func(address string) {
worker := Worker{address: addr}
worker.tagStore = &tagNode{}
go func(worker Worker) {
defer c.wg.Done()
for ctx.Err() == nil {
if err := c.subscribeGNMI(ctx, address, tlscfg, request); err != nil && ctx.Err() == nil {
if err := c.subscribeGNMI(ctx, &worker, tlscfg, request); err != nil && ctx.Err() == nil {
acc.AddError(err)
}
@ -180,30 +205,42 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
case <-time.After(time.Duration(c.Redial)):
}
}
}(addr)
}(worker)
}
return nil
}
func (s *Subscription) buildSubscription() (*gnmiLib.Subscription, error) {
gnmiPath, err := parsePath(s.Origin, s.Path, "")
if err != nil {
return nil, err
}
mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(s.SubscriptionMode)]
if !ok {
return nil, fmt.Errorf("invalid subscription mode %s", s.SubscriptionMode)
}
return &gnmiLib.Subscription{
Path: gnmiPath,
Mode: gnmiLib.SubscriptionMode(mode),
HeartbeatInterval: uint64(time.Duration(s.HeartbeatInterval).Nanoseconds()),
SampleInterval: uint64(time.Duration(s.SampleInterval).Nanoseconds()),
SuppressRedundant: s.SuppressRedundant,
}, nil
}
// Create a new gNMI SubscribeRequest
func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) {
// Create subscription objects
subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions))
for i, subscription := range c.Subscriptions {
gnmiPath, err := parsePath(subscription.Origin, subscription.Path, "")
if err != nil {
var err error
subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions)+len(c.TagSubscriptions))
for i, subscription := range c.TagSubscriptions {
if subscriptions[i], err = subscription.buildSubscription(); err != nil {
return nil, err
}
mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(subscription.SubscriptionMode)]
if !ok {
return nil, fmt.Errorf("invalid subscription mode %s", subscription.SubscriptionMode)
}
subscriptions[i] = &gnmiLib.Subscription{
Path: gnmiPath,
Mode: gnmiLib.SubscriptionMode(mode),
SampleInterval: uint64(time.Duration(subscription.SampleInterval).Nanoseconds()),
SuppressRedundant: subscription.SuppressRedundant,
HeartbeatInterval: uint64(time.Duration(subscription.HeartbeatInterval).Nanoseconds()),
}
for i, subscription := range c.Subscriptions {
if subscriptions[i+len(c.TagSubscriptions)], err = subscription.buildSubscription(); err != nil {
return nil, err
}
}
@ -231,7 +268,7 @@ func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) {
}
// SubscribeGNMI and extract telemetry data
func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
func (c *GNMI) subscribeGNMI(ctx context.Context, worker *Worker, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
var opt grpc.DialOption
if tlscfg != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlscfg))
@ -239,7 +276,7 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co
opt = grpc.WithInsecure()
}
client, err := grpc.DialContext(ctx, address, opt)
client, err := grpc.DialContext(ctx, worker.address, opt)
if err != nil {
return fmt.Errorf("failed to dial: %v", err)
}
@ -258,8 +295,8 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co
}
}
c.Log.Debugf("Connection to gNMI device %s established", address)
defer c.Log.Debugf("Connection to gNMI device %s closed", address)
c.Log.Debugf("Connection to gNMI device %s established", worker.address)
defer c.Log.Debugf("Connection to gNMI device %s closed", worker.address)
for ctx.Err() == nil {
var reply *gnmiLib.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
@ -269,22 +306,22 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co
break
}
c.handleSubscribeResponse(address, reply)
c.handleSubscribeResponse(worker, reply)
}
return nil
}
func (c *GNMI) handleSubscribeResponse(address string, reply *gnmiLib.SubscribeResponse) {
func (c *GNMI) handleSubscribeResponse(worker *Worker, reply *gnmiLib.SubscribeResponse) {
switch response := reply.Response.(type) {
case *gnmiLib.SubscribeResponse_Update:
c.handleSubscribeResponseUpdate(address, response)
c.handleSubscribeResponseUpdate(worker, response)
case *gnmiLib.SubscribeResponse_Error:
c.Log.Errorf("Subscribe error (%d), %q", response.Error.Code, response.Error.Message)
}
}
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.SubscribeResponse_Update) {
func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.SubscribeResponse_Update) {
var prefix, prefixAliasPath string
grouper := metric.NewSeriesGrouper()
timestamp := time.Unix(0, response.Update.Timestamp)
@ -292,16 +329,30 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
if response.Update.Prefix != nil {
var err error
if prefix, prefixAliasPath, err = c.handlePath(response.Update.Prefix, prefixTags, ""); err != nil {
if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, c.internalAliases, ""); err != nil {
c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
}
}
prefixTags["source"], _, _ = net.SplitHostPort(address)
prefixTags["source"], _, _ = net.SplitHostPort(worker.address)
prefixTags["path"] = prefix
// Process and remove tag-only updates from the response
for i := len(response.Update.Update) - 1; i >= 0; i-- {
update := response.Update.Update[i]
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
for _, tagSub := range c.TagSubscriptions {
if equalPathNoKeys(fullPath, tagSub.fullPath) {
worker.storeTags(update, tagSub)
response.Update.Update = append(response.Update.Update[:i], response.Update.Update[i+1:]...)
}
}
}
// Parse individual Update message and create measurements
var name, lastAliasPath string
for _, update := range response.Update.Update {
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
// Prepare tags from prefix
tags := make(map[string]string, len(prefixTags))
for key, val := range prefixTags {
@ -309,6 +360,16 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
}
aliasPath, fields := c.handleTelemetryField(update, tags, prefix)
if tagOnlyTags := worker.checkTags(fullPath, c.TagSubscriptions); tagOnlyTags != nil {
for k, v := range tagOnlyTags {
if alias, ok := c.internalAliases[k]; ok {
tags[alias] = fmt.Sprint(v)
} else {
tags[k] = fmt.Sprint(v)
}
}
}
// Inherent valid alias from prefix parsing
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
aliasPath = prefixAliasPath
@ -324,32 +385,6 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
}
}
// Update tag lookups and discard rest of update
subscriptionKey := tags["source"] + "/" + tags["name"]
c.lookupMutex.Lock()
if _, ok := c.lookup[name]; ok {
// We are subscribed to this, so add the fields to the lookup-table
if _, ok := c.lookup[name][subscriptionKey]; !ok {
c.lookup[name][subscriptionKey] = make(map[string]interface{})
}
for k, v := range fields {
c.lookup[name][subscriptionKey][path.Base(k)] = v
}
c.lookupMutex.Unlock()
// Do not process the data further as we only subscribed here for the lookup table
continue
}
// Apply lookups if present
for subscriptionName, values := range c.lookup {
if annotations, ok := values[subscriptionKey]; ok {
for k, v := range annotations {
tags[subscriptionName+"/"+k] = fmt.Sprint(v)
}
}
}
c.lookupMutex.Unlock()
// Group metrics
for k, v := range fields {
key := k
@ -386,62 +421,19 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S
// HandleTelemetryField and add it to a measurement
func (c *GNMI) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
gpath, aliasPath, err := c.handlePath(update.Path, tags, prefix)
gpath, aliasPath, err := handlePath(update.Path, tags, c.internalAliases, prefix)
if err != nil {
c.Log.Errorf("handling path %q failed: %v", update.Path, err)
}
var value interface{}
var jsondata []byte
// Make sure a value is actually set
if update.Val == nil || update.Val.Value == nil {
c.Log.Infof("Discarded empty or legacy type value with path: %q", gpath)
return aliasPath, nil
}
switch val := update.Val.Value.(type) {
case *gnmiLib.TypedValue_AsciiVal:
value = val.AsciiVal
case *gnmiLib.TypedValue_BoolVal:
value = val.BoolVal
case *gnmiLib.TypedValue_BytesVal:
value = val.BytesVal
case *gnmiLib.TypedValue_DecimalVal:
value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision))
case *gnmiLib.TypedValue_FloatVal:
value = val.FloatVal
case *gnmiLib.TypedValue_IntVal:
value = val.IntVal
case *gnmiLib.TypedValue_StringVal:
value = val.StringVal
case *gnmiLib.TypedValue_UintVal:
value = val.UintVal
case *gnmiLib.TypedValue_JsonIetfVal:
jsondata = val.JsonIetfVal
case *gnmiLib.TypedValue_JsonVal:
jsondata = val.JsonVal
}
name := strings.ReplaceAll(gpath, "-", "_")
fields := make(map[string]interface{})
if value != nil {
fields[name] = value
} else if jsondata != nil {
if err := json.Unmarshal(jsondata, &value); err != nil {
c.acc.AddError(fmt.Errorf("failed to parse JSON value: %v", err))
} else {
flattener := jsonparser.JSONFlattener{Fields: fields}
if err := flattener.FullFlattenJSON(name, value, true, true); err != nil {
c.acc.AddError(fmt.Errorf("failed to flatten JSON: %v", err))
}
}
fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val)
if err != nil {
c.Log.Errorf("error parsing update value %q: %v", update.Val, err)
}
return aliasPath, fields
}
// Parse path to path-buffer and tag-field
func (c *GNMI) handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) {
func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) {
builder := bytes.NewBufferString(prefix)
// Prefix with origin
@ -466,7 +458,7 @@ func (c *GNMI) handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, prefix
}
name := builder.String()
if _, exists := c.internalAliases[name]; exists {
if _, exists := aliases[name]; exists {
aliasPath = name
}
@ -521,3 +513,232 @@ func init() {
// Backwards compatible alias:
inputs.Add("cisco_telemetry_gnmi", New)
}
func convertTagOnlySubscription(s Subscription) TagSubscription {
t := TagSubscription{Subscription: s, Elements: []string{"interface"}}
return t
}
// equalPathNoKeys checks if two gNMI paths are equal, without keys
func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool {
if len(a.Elem) != len(b.Elem) {
return false
}
for i := range a.Elem {
if a.Elem[i].Name != b.Elem[i].Name {
return false
}
}
return true
}
func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem {
var newPath []*gnmiLib.PathElem
for _, elem := range gpath.Elem {
if elem.Key != nil {
newPath = append(newPath, elem)
}
}
return newPath
}
func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path {
if prefix == nil {
return gpath
}
fullPath := new(gnmiLib.Path)
fullPath.Origin = prefix.Origin
fullPath.Target = prefix.Target
fullPath.Elem = append(prefix.Elem, gpath.Elem...)
return fullPath
}
func (s *Subscription) buildFullPath(c *GNMI) error {
var err error
if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil {
return err
}
s.fullPath.Origin = s.Origin
s.fullPath.Target = c.Target
if c.Prefix != "" {
prefix, err := xpath.ToGNMIPath(c.Prefix)
if err != nil {
return err
}
s.fullPath.Elem = append(prefix.Elem, s.fullPath.Elem...)
if s.Origin == "" && c.Origin != "" {
s.fullPath.Origin = c.Origin
}
}
return nil
}
func (w *Worker) storeTags(update *gnmiLib.Update, sub TagSubscription) {
updateKeys := pathKeys(update.Path)
var foundKey bool
for _, requiredKey := range sub.Elements {
foundKey = false
for _, elem := range updateKeys {
if elem.Name == requiredKey {
foundKey = true
}
}
if !foundKey {
return
}
}
// All required keys present for this TagSubscription
w.tagStore.insert(updateKeys, sub.Name, update.Val)
}
func (node *tagNode) insert(keys []*gnmiLib.PathElem, name string, value *gnmiLib.TypedValue) {
if len(keys) == 0 {
node.value = value
node.tagName = name
return
}
var found *tagNode
key := keys[0]
keyName := key.Name
if node.tagStore == nil {
node.tagStore = make(map[string][]*tagNode)
}
if _, ok := node.tagStore[keyName]; !ok {
node.tagStore[keyName] = make([]*tagNode, 0)
}
for _, node := range node.tagStore[keyName] {
if compareKeys(node.elem.Key, key.Key) {
found = node
break
}
}
if found == nil {
found = &tagNode{elem: keys[0]}
node.tagStore[keyName] = append(node.tagStore[keyName], found)
}
found.insert(keys[1:], name, value)
}
func (node *tagNode) retrieve(keys []*gnmiLib.PathElem, tagResults *tagResults) {
if node.value != nil {
tagResults.names = append(tagResults.names, node.tagName)
tagResults.values = append(tagResults.values, node.value)
}
for _, key := range keys {
if elems, ok := node.tagStore[key.Name]; ok {
for _, node := range elems {
if compareKeys(node.elem.Key, key.Key) {
node.retrieve(keys, tagResults)
}
}
}
}
}
func (w *Worker) checkTags(fullPath *gnmiLib.Path, subscriptions []TagSubscription) map[string]interface{} {
results := &tagResults{}
w.tagStore.retrieve(pathKeys(fullPath), results)
tags := make(map[string]interface{})
for idx := range results.names {
vals, _ := gnmiToFields(results.names[idx], results.values[idx])
for k, v := range vals {
tags[k] = v
}
}
return tags
}
func (s *Subscription) buildAlias(aliases map[string]string) error {
var err error
var gnmiLongPath, gnmiShortPath *gnmiLib.Path
// Build the subscription path without keys
if gnmiLongPath, err = parsePath(s.Origin, s.Path, ""); err != nil {
return err
}
if gnmiShortPath, err = parsePath("", s.Path, ""); err != nil {
return err
}
longPath, _, err := handlePath(gnmiLongPath, nil, nil, "")
if err != nil {
return fmt.Errorf("handling long-path failed: %v", err)
}
shortPath, _, err := handlePath(gnmiShortPath, nil, nil, "")
if err != nil {
return fmt.Errorf("handling short-path failed: %v", err)
}
// If the user didn't provide a measurement name, use last path element
name := s.Name
if len(name) == 0 {
name = path.Base(shortPath)
}
if len(name) > 0 {
aliases[longPath] = name
aliases[shortPath] = name
}
return nil
}
func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) {
var value interface{}
var jsondata []byte
// Make sure a value is actually set
if updateVal == nil || updateVal.Value == nil {
return nil, nil
}
switch val := updateVal.Value.(type) {
case *gnmiLib.TypedValue_AsciiVal:
value = val.AsciiVal
case *gnmiLib.TypedValue_BoolVal:
value = val.BoolVal
case *gnmiLib.TypedValue_BytesVal:
value = val.BytesVal
case *gnmiLib.TypedValue_DecimalVal:
value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision))
case *gnmiLib.TypedValue_FloatVal:
value = val.FloatVal
case *gnmiLib.TypedValue_IntVal:
value = val.IntVal
case *gnmiLib.TypedValue_StringVal:
value = val.StringVal
case *gnmiLib.TypedValue_UintVal:
value = val.UintVal
case *gnmiLib.TypedValue_JsonIetfVal:
jsondata = val.JsonIetfVal
case *gnmiLib.TypedValue_JsonVal:
jsondata = val.JsonVal
}
fields := make(map[string]interface{})
if value != nil {
fields[name] = value
} else if jsondata != nil {
if err := json.Unmarshal(jsondata, &value); err != nil {
return nil, fmt.Errorf("failed to parse JSON value: %v", err)
}
flattener := jsonparser.JSONFlattener{Fields: fields}
if err := flattener.FullFlattenJSON(name, value, true, true); err != nil {
return nil, fmt.Errorf("failed to flatten JSON: %v", err)
}
}
return fields, nil
}
func compareKeys(a map[string]string, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if _, ok := b[k]; !ok {
return false
}
if b[k] != v {
return false
}
}
return true
}

View File

@ -371,7 +371,7 @@ func TestNotification(t *testing.T) {
},
},
{
name: "tagged update pair",
name: "legacy tagged update pair",
plugin: &GNMI{
Log: testutil.Logger{},
Encoding: "proto",
@ -478,10 +478,10 @@ func TestNotification(t *testing.T) {
testutil.MustMetric(
"oc-intf-counters",
map[string]string{
"path": "",
"source": "127.0.0.1",
"name": "Ethernet1",
"oc-intf-desc/description": "foo",
"path": "",
"source": "127.0.0.1",
"name": "Ethernet1",
"oc-intf-desc": "foo",
},
map[string]interface{}{
"in_broadcast_pkts": 42,
@ -490,6 +490,164 @@ func TestNotification(t *testing.T) {
),
},
},
{
name: "iss #11011",
plugin: &GNMI{
Log: testutil.Logger{},
Encoding: "proto",
Redial: config.Duration(1 * time.Second),
TagSubscriptions: []TagSubscription{
{
Subscription: Subscription{
Name: "oc-neigh-desc",
Origin: "openconfig",
Path: "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/description",
SubscriptionMode: "on_change",
},
Elements: []string{"network-instance", "protocol", "neighbor"},
},
},
Subscriptions: []Subscription{
{
Name: "oc-neigh-state",
Origin: "openconfig",
Path: "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/session-state",
SubscriptionMode: "on_change",
},
},
},
server: &MockServer{
SubscribeF: func(server gnmiLib.GNMI_SubscribeServer) error {
tagResponse := &gnmiLib.SubscribeResponse{
Response: &gnmiLib.SubscribeResponse_Update{
Update: &gnmiLib.Notification{
Timestamp: 1543236571000000000,
Prefix: &gnmiLib.Path{},
Update: []*gnmiLib.Update{
{
Path: &gnmiLib.Path{
Origin: "",
Elem: []*gnmiLib.PathElem{
{
Name: "network-instances",
},
{
Name: "network-instance",
Key: map[string]string{"name": "default"},
},
{
Name: "protocols",
},
{
Name: "protocol",
Key: map[string]string{"name": "BGP", "identifier": "BGP"},
},
{
Name: "bgp",
},
{
Name: "neighbors",
},
{
Name: "neighbor",
Key: map[string]string{"neighbor_address": "192.0.2.1"},
},
{
Name: "state",
},
{
Name: "description",
},
},
Target: "",
},
Val: &gnmiLib.TypedValue{
Value: &gnmiLib.TypedValue_StringVal{StringVal: "EXAMPLE-PEER"},
},
},
},
},
},
}
if err := server.Send(tagResponse); err != nil {
return err
}
if err := server.Send(&gnmiLib.SubscribeResponse{Response: &gnmiLib.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
return err
}
taggedResponse := &gnmiLib.SubscribeResponse{
Response: &gnmiLib.SubscribeResponse_Update{
Update: &gnmiLib.Notification{
Timestamp: 1543236572000000000,
Prefix: &gnmiLib.Path{},
Update: []*gnmiLib.Update{
{
Path: &gnmiLib.Path{
Origin: "",
Elem: []*gnmiLib.PathElem{
{
Name: "network-instances",
},
{
Name: "network-instance",
Key: map[string]string{"name": "default"},
},
{
Name: "protocols",
},
{
Name: "protocol",
Key: map[string]string{"name": "BGP", "identifier": "BGP"},
},
{
Name: "bgp",
},
{
Name: "neighbors",
},
{
Name: "neighbor",
Key: map[string]string{"neighbor_address": "192.0.2.1"},
},
{
Name: "state",
},
{
Name: "session-state",
},
},
Target: "",
},
Val: &gnmiLib.TypedValue{
Value: &gnmiLib.TypedValue_StringVal{StringVal: "ESTABLISHED"},
},
},
},
},
},
}
return server.Send(taggedResponse)
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"oc-neigh-state",
map[string]string{
"path": "",
"source": "127.0.0.1",
"neighbor_address": "192.0.2.1",
"name": "default",
"oc-neigh-desc": "EXAMPLE-PEER",
"/network-instances/network-instance/protocols/protocol/name": "BGP",
"identifier": "BGP",
},
map[string]interface{}{
"session_state": "ESTABLISHED",
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
@ -544,7 +702,7 @@ func TestSubscribeResponseError(t *testing.T) {
plugin := &GNMI{Log: ml}
// TODO: FIX SA1019: gnmi.Error is deprecated: Do not use.
errorResponse := &gnmiLib.SubscribeResponse_Error{Error: &gnmiLib.Error{Message: me, Code: mc}}
plugin.handleSubscribeResponse("127.0.0.1:0", &gnmiLib.SubscribeResponse{Response: errorResponse})
plugin.handleSubscribeResponse(&Worker{address: "127.0.0.1:0"}, &gnmiLib.SubscribeResponse{Response: errorResponse})
require.NotEmpty(t, ml.lastFormat)
require.Equal(t, []interface{}{mc, me}, ml.lastArgs)
}
@ -615,3 +773,194 @@ func TestRedial(t *testing.T) {
grpcServer.Stop()
wg.Wait()
}
func TestTagNode(t *testing.T) {
type insertOp struct {
keys []*gnmiLib.PathElem
name string
value *gnmiLib.TypedValue
}
interfaceElemSingleKey := &gnmiLib.PathElem{
Name: "interface",
Key: map[string]string{"name": "Management0"},
}
networkInstanceSingleKey := &gnmiLib.PathElem{
Name: "network-instance",
Key: map[string]string{"name": "default"},
}
protocolDoubleKey := &gnmiLib.PathElem{
Name: "protocol",
Key: map[string]string{"name": "BGP", "protocol": "BGP"},
}
neighborSingleKey := &gnmiLib.PathElem{
Name: "neighbor",
Key: map[string]string{"neighbor_address": "192.0.2.1"},
}
tests := []struct {
name string
insertOps []insertOp
expected *tagNode
}{
{
name: "single elem single key insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{interfaceElemSingleKey},
name: "tagFoo",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"interface": {
{
elem: interfaceElemSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}},
tagName: "tagFoo",
},
},
},
},
},
{
name: "double elem single key insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{interfaceElemSingleKey, networkInstanceSingleKey},
name: "tagBar",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"interface": {
{
elem: interfaceElemSingleKey,
tagStore: map[string][]*tagNode{
"network-instance": {
{
elem: networkInstanceSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}},
tagName: "tagBar",
},
},
},
},
},
},
},
},
{
name: "single elem double key insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{protocolDoubleKey},
name: "doubleKey",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"protocol": {
{
elem: protocolDoubleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}},
tagName: "doubleKey",
},
},
},
},
},
{
name: "multi elem unrelated insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{interfaceElemSingleKey},
name: "intf_desc",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}},
},
{
keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey, neighborSingleKey},
name: "bgp_neigh_desc",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"interface": {
{
elem: interfaceElemSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}},
tagName: "intf_desc",
},
},
"network-instance": {
{
elem: networkInstanceSingleKey,
tagStore: map[string][]*tagNode{
"protocol": {
{
elem: protocolDoubleKey,
tagStore: map[string][]*tagNode{
"neighbor": {
{
elem: neighborSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}},
tagName: "bgp_neigh_desc",
},
},
},
},
},
},
},
},
},
},
},
{
name: "values at multiple levels",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{networkInstanceSingleKey},
name: "vrf_stuff",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}},
},
{
keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey},
name: "protocol_stuff",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"network-instance": {
{
elem: networkInstanceSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}},
tagName: "vrf_stuff",
tagStore: map[string][]*tagNode{
"protocol": {
{
elem: protocolDoubleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}},
tagName: "protocol_stuff",
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rootNode := new(tagNode)
for _, s := range tt.insertOps {
rootNode.insert(s.keys, s.name, s.value)
}
require.Equal(t, rootNode, tt.expected)
})
}
}