From f29f7b28f2a3d547e235bb89523ea7bbcd74861c Mon Sep 17 00:00:00 2001 From: bewing Date: Thu, 7 Jul 2022 13:50:40 -0500 Subject: [PATCH] fix(gnmi): refactor tag-only subs for complex keys (#11011) --- plugins/inputs/gnmi/README.md | 24 +- plugins/inputs/gnmi/gnmi.go | 511 ++++++++++++++++++++++--------- plugins/inputs/gnmi/gnmi_test.go | 361 +++++++++++++++++++++- 3 files changed, 734 insertions(+), 162 deletions(-) diff --git a/plugins/inputs/gnmi/README.md b/plugins/inputs/gnmi/README.md index ae42ebffd..2956043bd 100644 --- a/plugins/inputs/gnmi/README.md +++ b/plugins/inputs/gnmi/README.md @@ -58,7 +58,7 @@ It has been optimized to support gNMI telemetry as produced by Cisco IOS XR ## origin usually refers to a (YANG) data model implemented by the device ## and path to a specific substructure inside it that should be subscribed to (similar to an XPath) ## YANG models can be found e.g. here: https://github.com/YangModels/yang/tree/master/vendor/cisco/xr - origin = "openconfig-interfaces" + origin = "openconfig" path = "/interfaces/interface/state/counters" # Subscription mode (one of: "target_defined", "sample", "on_change") and interval @@ -71,17 +71,19 @@ It has been optimized to support gNMI telemetry as produced by Cisco IOS XR ## If suppression is enabled, send updates at least every X seconds anyway # heartbeat_interval = "60s" - #[[inputs.gnmi.subscription]] - # name = "descr" - # origin = "openconfig-interfaces" - # path = "/interfaces/interface/state/description" - # subscription_mode = "on_change" + ## Tag subscriptions are subscriptions to paths intended to be applied as tags to other subscriptions + [[inputs.gnmi.tag_subscription]] + # When applying this value as a tag to other metrics, use this tag name + name = "descr" + # All other subscription fields are as normal + origin = "openconfig" + path = "/interfaces/interface/state/description" + subscription_mode = "on_change" + # At least one path element name must be supplied that contains at least one key to match on + # Multiple element names can be specified in any order - all element names must be present and contain + # to be stored as an in-memory tag + elements = ["interface"] - ## If tag_only is set, the subscription in question will be utilized to maintain a map of - ## tags to apply to other measurements emitted by the plugin, by matching path keys - ## All fields from the tag-only subscription will be applied as tags to other readings, - ## in the format _. - # tag_only = true ``` ## Example Output diff --git a/plugins/inputs/gnmi/gnmi.go b/plugins/inputs/gnmi/gnmi.go index b7b8bc22b..936235fd2 100644 --- a/plugins/inputs/gnmi/gnmi.go +++ b/plugins/inputs/gnmi/gnmi.go @@ -36,9 +36,10 @@ var sampleConfig string // gNMI plugin instance type GNMI struct { - Addresses []string `toml:"addresses"` - Subscriptions []Subscription `toml:"subscription"` - Aliases map[string]string `toml:"aliases"` + Addresses []string `toml:"addresses"` + Subscriptions []Subscription `toml:"subscription"` + TagSubscriptions []TagSubscription `toml:"tag_subscription"` + Aliases map[string]string `toml:"aliases"` // Optional subscription configuration Encoding string @@ -63,19 +64,36 @@ type GNMI struct { acc telegraf.Accumulator cancel context.CancelFunc wg sync.WaitGroup - // Lookup/device+name/key/value - lookup map[string]map[string]map[string]interface{} - lookupMutex sync.Mutex + legacyTags bool Log telegraf.Logger } +type Worker struct { + address string + tagStore *tagNode +} + +type tagNode struct { + elem *gnmiLib.PathElem + tagName string + value *gnmiLib.TypedValue + tagStore map[string][]*tagNode +} + +type tagResults struct { + names []string + values []*gnmiLib.TypedValue +} + // Subscription for a gNMI client type Subscription struct { Name string Origin string Path string + fullPath *gnmiLib.Path + // Subscription mode and interval SubscriptionMode string `toml:"subscription_mode"` SampleInterval config.Duration `toml:"sample_interval"` @@ -88,6 +106,12 @@ type Subscription struct { TagOnly bool `toml:"tag_only"` } +// Tag Subscription for a gNMI client +type TagSubscription struct { + Subscription + Elements []string +} + func (*GNMI) SampleConfig() string { return sampleConfig } @@ -100,9 +124,33 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { var request *gnmiLib.SubscribeRequest c.acc = acc ctx, c.cancel = context.WithCancel(context.Background()) - c.lookupMutex.Lock() - c.lookup = make(map[string]map[string]map[string]interface{}) - c.lookupMutex.Unlock() + + for i := len(c.Subscriptions) - 1; i >= 0; i-- { + subscription := c.Subscriptions[i] + // Support legacy TagOnly subscriptions + if subscription.TagOnly { + tagSub := convertTagOnlySubscription(subscription) + c.TagSubscriptions = append(c.TagSubscriptions, tagSub) + // Remove from the original subscriptions list + c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...) + c.legacyTags = true + continue + } + if err = subscription.buildFullPath(c); err != nil { + return err + } + } + for idx := range c.TagSubscriptions { + if err = c.TagSubscriptions[idx].buildFullPath(c); err != nil { + return err + } + if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly { + return fmt.Errorf("do not mix legacy tag_only subscriptions and tag subscriptions") + } + if len(c.TagSubscriptions[idx].Elements) == 0 { + return fmt.Errorf("tag_subscription must have at least one element") + } + } // Validate configuration if request, err = c.newSubscribeRequest(); err != nil { @@ -123,44 +171,19 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { } // Invert explicit alias list and prefill subscription names - c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)) - for _, subscription := range c.Subscriptions { - var gnmiLongPath, gnmiShortPath *gnmiLib.Path + c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions)) - // Build the subscription path without keys - if gnmiLongPath, err = parsePath(subscription.Origin, subscription.Path, ""); err != nil { + for _, s := range c.Subscriptions { + if err := s.buildAlias(c.internalAliases); err != nil { return err } - if gnmiShortPath, err = parsePath("", subscription.Path, ""); err != nil { - return err - } - - longPath, _, err := c.handlePath(gnmiLongPath, nil, "") - if err != nil { - return fmt.Errorf("handling long-path failed: %v", err) - } - shortPath, _, err := c.handlePath(gnmiShortPath, nil, "") - if err != nil { - return fmt.Errorf("handling short-path failed: %v", err) - } - name := subscription.Name - - // If the user didn't provide a measurement name, use last path element - if len(name) == 0 { - name = path.Base(shortPath) - } - if len(name) > 0 { - c.internalAliases[longPath] = name - c.internalAliases[shortPath] = name - } - - if subscription.TagOnly { - // Create the top-level lookup for this tag - c.lookupMutex.Lock() - c.lookup[name] = make(map[string]map[string]interface{}) - c.lookupMutex.Unlock() - } } + for _, s := range c.TagSubscriptions { + if err := s.buildAlias(c.internalAliases); err != nil { + return err + } + } + for alias, encodingPath := range c.Aliases { c.internalAliases[encodingPath] = alias } @@ -168,10 +191,12 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { // Create a goroutine for each device, dial and subscribe c.wg.Add(len(c.Addresses)) for _, addr := range c.Addresses { - go func(address string) { + worker := Worker{address: addr} + worker.tagStore = &tagNode{} + go func(worker Worker) { defer c.wg.Done() for ctx.Err() == nil { - if err := c.subscribeGNMI(ctx, address, tlscfg, request); err != nil && ctx.Err() == nil { + if err := c.subscribeGNMI(ctx, &worker, tlscfg, request); err != nil && ctx.Err() == nil { acc.AddError(err) } @@ -180,30 +205,42 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { case <-time.After(time.Duration(c.Redial)): } } - }(addr) + }(worker) } return nil } +func (s *Subscription) buildSubscription() (*gnmiLib.Subscription, error) { + gnmiPath, err := parsePath(s.Origin, s.Path, "") + if err != nil { + return nil, err + } + mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(s.SubscriptionMode)] + if !ok { + return nil, fmt.Errorf("invalid subscription mode %s", s.SubscriptionMode) + } + return &gnmiLib.Subscription{ + Path: gnmiPath, + Mode: gnmiLib.SubscriptionMode(mode), + HeartbeatInterval: uint64(time.Duration(s.HeartbeatInterval).Nanoseconds()), + SampleInterval: uint64(time.Duration(s.SampleInterval).Nanoseconds()), + SuppressRedundant: s.SuppressRedundant, + }, nil +} + // Create a new gNMI SubscribeRequest func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) { // Create subscription objects - subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions)) - for i, subscription := range c.Subscriptions { - gnmiPath, err := parsePath(subscription.Origin, subscription.Path, "") - if err != nil { + var err error + subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions)+len(c.TagSubscriptions)) + for i, subscription := range c.TagSubscriptions { + if subscriptions[i], err = subscription.buildSubscription(); err != nil { return nil, err } - mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(subscription.SubscriptionMode)] - if !ok { - return nil, fmt.Errorf("invalid subscription mode %s", subscription.SubscriptionMode) - } - subscriptions[i] = &gnmiLib.Subscription{ - Path: gnmiPath, - Mode: gnmiLib.SubscriptionMode(mode), - SampleInterval: uint64(time.Duration(subscription.SampleInterval).Nanoseconds()), - SuppressRedundant: subscription.SuppressRedundant, - HeartbeatInterval: uint64(time.Duration(subscription.HeartbeatInterval).Nanoseconds()), + } + for i, subscription := range c.Subscriptions { + if subscriptions[i+len(c.TagSubscriptions)], err = subscription.buildSubscription(); err != nil { + return nil, err } } @@ -231,7 +268,7 @@ func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) { } // SubscribeGNMI and extract telemetry data -func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { +func (c *GNMI) subscribeGNMI(ctx context.Context, worker *Worker, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { var opt grpc.DialOption if tlscfg != nil { opt = grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)) @@ -239,7 +276,7 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co opt = grpc.WithInsecure() } - client, err := grpc.DialContext(ctx, address, opt) + client, err := grpc.DialContext(ctx, worker.address, opt) if err != nil { return fmt.Errorf("failed to dial: %v", err) } @@ -258,8 +295,8 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co } } - c.Log.Debugf("Connection to gNMI device %s established", address) - defer c.Log.Debugf("Connection to gNMI device %s closed", address) + c.Log.Debugf("Connection to gNMI device %s established", worker.address) + defer c.Log.Debugf("Connection to gNMI device %s closed", worker.address) for ctx.Err() == nil { var reply *gnmiLib.SubscribeResponse if reply, err = subscribeClient.Recv(); err != nil { @@ -269,22 +306,22 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co break } - c.handleSubscribeResponse(address, reply) + c.handleSubscribeResponse(worker, reply) } return nil } -func (c *GNMI) handleSubscribeResponse(address string, reply *gnmiLib.SubscribeResponse) { +func (c *GNMI) handleSubscribeResponse(worker *Worker, reply *gnmiLib.SubscribeResponse) { switch response := reply.Response.(type) { case *gnmiLib.SubscribeResponse_Update: - c.handleSubscribeResponseUpdate(address, response) + c.handleSubscribeResponseUpdate(worker, response) case *gnmiLib.SubscribeResponse_Error: c.Log.Errorf("Subscribe error (%d), %q", response.Error.Code, response.Error.Message) } } // Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data -func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.SubscribeResponse_Update) { +func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.SubscribeResponse_Update) { var prefix, prefixAliasPath string grouper := metric.NewSeriesGrouper() timestamp := time.Unix(0, response.Update.Timestamp) @@ -292,16 +329,30 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S if response.Update.Prefix != nil { var err error - if prefix, prefixAliasPath, err = c.handlePath(response.Update.Prefix, prefixTags, ""); err != nil { + if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, c.internalAliases, ""); err != nil { c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err) } } - prefixTags["source"], _, _ = net.SplitHostPort(address) + prefixTags["source"], _, _ = net.SplitHostPort(worker.address) prefixTags["path"] = prefix + // Process and remove tag-only updates from the response + for i := len(response.Update.Update) - 1; i >= 0; i-- { + update := response.Update.Update[i] + fullPath := pathWithPrefix(response.Update.Prefix, update.Path) + for _, tagSub := range c.TagSubscriptions { + if equalPathNoKeys(fullPath, tagSub.fullPath) { + worker.storeTags(update, tagSub) + response.Update.Update = append(response.Update.Update[:i], response.Update.Update[i+1:]...) + } + } + } + // Parse individual Update message and create measurements var name, lastAliasPath string for _, update := range response.Update.Update { + fullPath := pathWithPrefix(response.Update.Prefix, update.Path) + // Prepare tags from prefix tags := make(map[string]string, len(prefixTags)) for key, val := range prefixTags { @@ -309,6 +360,16 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S } aliasPath, fields := c.handleTelemetryField(update, tags, prefix) + if tagOnlyTags := worker.checkTags(fullPath, c.TagSubscriptions); tagOnlyTags != nil { + for k, v := range tagOnlyTags { + if alias, ok := c.internalAliases[k]; ok { + tags[alias] = fmt.Sprint(v) + } else { + tags[k] = fmt.Sprint(v) + } + } + } + // Inherent valid alias from prefix parsing if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { aliasPath = prefixAliasPath @@ -324,32 +385,6 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S } } - // Update tag lookups and discard rest of update - subscriptionKey := tags["source"] + "/" + tags["name"] - c.lookupMutex.Lock() - if _, ok := c.lookup[name]; ok { - // We are subscribed to this, so add the fields to the lookup-table - if _, ok := c.lookup[name][subscriptionKey]; !ok { - c.lookup[name][subscriptionKey] = make(map[string]interface{}) - } - for k, v := range fields { - c.lookup[name][subscriptionKey][path.Base(k)] = v - } - c.lookupMutex.Unlock() - // Do not process the data further as we only subscribed here for the lookup table - continue - } - - // Apply lookups if present - for subscriptionName, values := range c.lookup { - if annotations, ok := values[subscriptionKey]; ok { - for k, v := range annotations { - tags[subscriptionName+"/"+k] = fmt.Sprint(v) - } - } - } - c.lookupMutex.Unlock() - // Group metrics for k, v := range fields { key := k @@ -386,62 +421,19 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.S // HandleTelemetryField and add it to a measurement func (c *GNMI) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { - gpath, aliasPath, err := c.handlePath(update.Path, tags, prefix) + gpath, aliasPath, err := handlePath(update.Path, tags, c.internalAliases, prefix) if err != nil { c.Log.Errorf("handling path %q failed: %v", update.Path, err) } - - var value interface{} - var jsondata []byte - - // Make sure a value is actually set - if update.Val == nil || update.Val.Value == nil { - c.Log.Infof("Discarded empty or legacy type value with path: %q", gpath) - return aliasPath, nil - } - - switch val := update.Val.Value.(type) { - case *gnmiLib.TypedValue_AsciiVal: - value = val.AsciiVal - case *gnmiLib.TypedValue_BoolVal: - value = val.BoolVal - case *gnmiLib.TypedValue_BytesVal: - value = val.BytesVal - case *gnmiLib.TypedValue_DecimalVal: - value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision)) - case *gnmiLib.TypedValue_FloatVal: - value = val.FloatVal - case *gnmiLib.TypedValue_IntVal: - value = val.IntVal - case *gnmiLib.TypedValue_StringVal: - value = val.StringVal - case *gnmiLib.TypedValue_UintVal: - value = val.UintVal - case *gnmiLib.TypedValue_JsonIetfVal: - jsondata = val.JsonIetfVal - case *gnmiLib.TypedValue_JsonVal: - jsondata = val.JsonVal - } - - name := strings.ReplaceAll(gpath, "-", "_") - fields := make(map[string]interface{}) - if value != nil { - fields[name] = value - } else if jsondata != nil { - if err := json.Unmarshal(jsondata, &value); err != nil { - c.acc.AddError(fmt.Errorf("failed to parse JSON value: %v", err)) - } else { - flattener := jsonparser.JSONFlattener{Fields: fields} - if err := flattener.FullFlattenJSON(name, value, true, true); err != nil { - c.acc.AddError(fmt.Errorf("failed to flatten JSON: %v", err)) - } - } + fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val) + if err != nil { + c.Log.Errorf("error parsing update value %q: %v", update.Val, err) } return aliasPath, fields } // Parse path to path-buffer and tag-field -func (c *GNMI) handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) { +func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) { builder := bytes.NewBufferString(prefix) // Prefix with origin @@ -466,7 +458,7 @@ func (c *GNMI) handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, prefix } name := builder.String() - if _, exists := c.internalAliases[name]; exists { + if _, exists := aliases[name]; exists { aliasPath = name } @@ -521,3 +513,232 @@ func init() { // Backwards compatible alias: inputs.Add("cisco_telemetry_gnmi", New) } + +func convertTagOnlySubscription(s Subscription) TagSubscription { + t := TagSubscription{Subscription: s, Elements: []string{"interface"}} + return t +} + +// equalPathNoKeys checks if two gNMI paths are equal, without keys +func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool { + if len(a.Elem) != len(b.Elem) { + return false + } + for i := range a.Elem { + if a.Elem[i].Name != b.Elem[i].Name { + return false + } + } + return true +} + +func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem { + var newPath []*gnmiLib.PathElem + for _, elem := range gpath.Elem { + if elem.Key != nil { + newPath = append(newPath, elem) + } + } + return newPath +} + +func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path { + if prefix == nil { + return gpath + } + fullPath := new(gnmiLib.Path) + fullPath.Origin = prefix.Origin + fullPath.Target = prefix.Target + fullPath.Elem = append(prefix.Elem, gpath.Elem...) + return fullPath +} + +func (s *Subscription) buildFullPath(c *GNMI) error { + var err error + if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil { + return err + } + s.fullPath.Origin = s.Origin + s.fullPath.Target = c.Target + if c.Prefix != "" { + prefix, err := xpath.ToGNMIPath(c.Prefix) + if err != nil { + return err + } + s.fullPath.Elem = append(prefix.Elem, s.fullPath.Elem...) + if s.Origin == "" && c.Origin != "" { + s.fullPath.Origin = c.Origin + } + } + return nil +} + +func (w *Worker) storeTags(update *gnmiLib.Update, sub TagSubscription) { + updateKeys := pathKeys(update.Path) + var foundKey bool + for _, requiredKey := range sub.Elements { + foundKey = false + for _, elem := range updateKeys { + if elem.Name == requiredKey { + foundKey = true + } + } + if !foundKey { + return + } + } + // All required keys present for this TagSubscription + w.tagStore.insert(updateKeys, sub.Name, update.Val) +} + +func (node *tagNode) insert(keys []*gnmiLib.PathElem, name string, value *gnmiLib.TypedValue) { + if len(keys) == 0 { + node.value = value + node.tagName = name + return + } + var found *tagNode + key := keys[0] + keyName := key.Name + if node.tagStore == nil { + node.tagStore = make(map[string][]*tagNode) + } + if _, ok := node.tagStore[keyName]; !ok { + node.tagStore[keyName] = make([]*tagNode, 0) + } + for _, node := range node.tagStore[keyName] { + if compareKeys(node.elem.Key, key.Key) { + found = node + break + } + } + if found == nil { + found = &tagNode{elem: keys[0]} + node.tagStore[keyName] = append(node.tagStore[keyName], found) + } + found.insert(keys[1:], name, value) +} + +func (node *tagNode) retrieve(keys []*gnmiLib.PathElem, tagResults *tagResults) { + if node.value != nil { + tagResults.names = append(tagResults.names, node.tagName) + tagResults.values = append(tagResults.values, node.value) + } + for _, key := range keys { + if elems, ok := node.tagStore[key.Name]; ok { + for _, node := range elems { + if compareKeys(node.elem.Key, key.Key) { + node.retrieve(keys, tagResults) + } + } + } + } +} + +func (w *Worker) checkTags(fullPath *gnmiLib.Path, subscriptions []TagSubscription) map[string]interface{} { + results := &tagResults{} + w.tagStore.retrieve(pathKeys(fullPath), results) + tags := make(map[string]interface{}) + for idx := range results.names { + vals, _ := gnmiToFields(results.names[idx], results.values[idx]) + for k, v := range vals { + tags[k] = v + } + } + return tags +} + +func (s *Subscription) buildAlias(aliases map[string]string) error { + var err error + var gnmiLongPath, gnmiShortPath *gnmiLib.Path + + // Build the subscription path without keys + if gnmiLongPath, err = parsePath(s.Origin, s.Path, ""); err != nil { + return err + } + if gnmiShortPath, err = parsePath("", s.Path, ""); err != nil { + return err + } + + longPath, _, err := handlePath(gnmiLongPath, nil, nil, "") + if err != nil { + return fmt.Errorf("handling long-path failed: %v", err) + } + shortPath, _, err := handlePath(gnmiShortPath, nil, nil, "") + if err != nil { + return fmt.Errorf("handling short-path failed: %v", err) + } + + // If the user didn't provide a measurement name, use last path element + name := s.Name + if len(name) == 0 { + name = path.Base(shortPath) + } + if len(name) > 0 { + aliases[longPath] = name + aliases[shortPath] = name + } + return nil +} + +func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) { + var value interface{} + var jsondata []byte + + // Make sure a value is actually set + if updateVal == nil || updateVal.Value == nil { + return nil, nil + } + + switch val := updateVal.Value.(type) { + case *gnmiLib.TypedValue_AsciiVal: + value = val.AsciiVal + case *gnmiLib.TypedValue_BoolVal: + value = val.BoolVal + case *gnmiLib.TypedValue_BytesVal: + value = val.BytesVal + case *gnmiLib.TypedValue_DecimalVal: + value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision)) + case *gnmiLib.TypedValue_FloatVal: + value = val.FloatVal + case *gnmiLib.TypedValue_IntVal: + value = val.IntVal + case *gnmiLib.TypedValue_StringVal: + value = val.StringVal + case *gnmiLib.TypedValue_UintVal: + value = val.UintVal + case *gnmiLib.TypedValue_JsonIetfVal: + jsondata = val.JsonIetfVal + case *gnmiLib.TypedValue_JsonVal: + jsondata = val.JsonVal + } + + fields := make(map[string]interface{}) + if value != nil { + fields[name] = value + } else if jsondata != nil { + if err := json.Unmarshal(jsondata, &value); err != nil { + return nil, fmt.Errorf("failed to parse JSON value: %v", err) + } + flattener := jsonparser.JSONFlattener{Fields: fields} + if err := flattener.FullFlattenJSON(name, value, true, true); err != nil { + return nil, fmt.Errorf("failed to flatten JSON: %v", err) + } + } + return fields, nil +} + +func compareKeys(a map[string]string, b map[string]string) bool { + if len(a) != len(b) { + return false + } + for k, v := range a { + if _, ok := b[k]; !ok { + return false + } + if b[k] != v { + return false + } + } + return true +} diff --git a/plugins/inputs/gnmi/gnmi_test.go b/plugins/inputs/gnmi/gnmi_test.go index e2078a676..6e3fcac86 100644 --- a/plugins/inputs/gnmi/gnmi_test.go +++ b/plugins/inputs/gnmi/gnmi_test.go @@ -371,7 +371,7 @@ func TestNotification(t *testing.T) { }, }, { - name: "tagged update pair", + name: "legacy tagged update pair", plugin: &GNMI{ Log: testutil.Logger{}, Encoding: "proto", @@ -478,10 +478,10 @@ func TestNotification(t *testing.T) { testutil.MustMetric( "oc-intf-counters", map[string]string{ - "path": "", - "source": "127.0.0.1", - "name": "Ethernet1", - "oc-intf-desc/description": "foo", + "path": "", + "source": "127.0.0.1", + "name": "Ethernet1", + "oc-intf-desc": "foo", }, map[string]interface{}{ "in_broadcast_pkts": 42, @@ -490,6 +490,164 @@ func TestNotification(t *testing.T) { ), }, }, + { + name: "iss #11011", + plugin: &GNMI{ + Log: testutil.Logger{}, + Encoding: "proto", + Redial: config.Duration(1 * time.Second), + TagSubscriptions: []TagSubscription{ + { + Subscription: Subscription{ + Name: "oc-neigh-desc", + Origin: "openconfig", + Path: "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/description", + SubscriptionMode: "on_change", + }, + Elements: []string{"network-instance", "protocol", "neighbor"}, + }, + }, + Subscriptions: []Subscription{ + { + Name: "oc-neigh-state", + Origin: "openconfig", + Path: "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/session-state", + SubscriptionMode: "on_change", + }, + }, + }, + server: &MockServer{ + SubscribeF: func(server gnmiLib.GNMI_SubscribeServer) error { + tagResponse := &gnmiLib.SubscribeResponse{ + Response: &gnmiLib.SubscribeResponse_Update{ + Update: &gnmiLib.Notification{ + Timestamp: 1543236571000000000, + Prefix: &gnmiLib.Path{}, + Update: []*gnmiLib.Update{ + { + Path: &gnmiLib.Path{ + Origin: "", + Elem: []*gnmiLib.PathElem{ + { + Name: "network-instances", + }, + { + Name: "network-instance", + Key: map[string]string{"name": "default"}, + }, + { + Name: "protocols", + }, + { + Name: "protocol", + Key: map[string]string{"name": "BGP", "identifier": "BGP"}, + }, + { + Name: "bgp", + }, + { + Name: "neighbors", + }, + { + Name: "neighbor", + Key: map[string]string{"neighbor_address": "192.0.2.1"}, + }, + { + Name: "state", + }, + { + Name: "description", + }, + }, + Target: "", + }, + Val: &gnmiLib.TypedValue{ + Value: &gnmiLib.TypedValue_StringVal{StringVal: "EXAMPLE-PEER"}, + }, + }, + }, + }, + }, + } + if err := server.Send(tagResponse); err != nil { + return err + } + if err := server.Send(&gnmiLib.SubscribeResponse{Response: &gnmiLib.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil { + return err + } + taggedResponse := &gnmiLib.SubscribeResponse{ + Response: &gnmiLib.SubscribeResponse_Update{ + Update: &gnmiLib.Notification{ + Timestamp: 1543236572000000000, + Prefix: &gnmiLib.Path{}, + Update: []*gnmiLib.Update{ + { + Path: &gnmiLib.Path{ + Origin: "", + Elem: []*gnmiLib.PathElem{ + { + Name: "network-instances", + }, + { + Name: "network-instance", + Key: map[string]string{"name": "default"}, + }, + { + Name: "protocols", + }, + { + Name: "protocol", + Key: map[string]string{"name": "BGP", "identifier": "BGP"}, + }, + { + Name: "bgp", + }, + { + Name: "neighbors", + }, + { + Name: "neighbor", + Key: map[string]string{"neighbor_address": "192.0.2.1"}, + }, + { + Name: "state", + }, + { + Name: "session-state", + }, + }, + Target: "", + }, + Val: &gnmiLib.TypedValue{ + Value: &gnmiLib.TypedValue_StringVal{StringVal: "ESTABLISHED"}, + }, + }, + }, + }, + }, + } + return server.Send(taggedResponse) + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "oc-neigh-state", + map[string]string{ + "path": "", + "source": "127.0.0.1", + "neighbor_address": "192.0.2.1", + "name": "default", + "oc-neigh-desc": "EXAMPLE-PEER", + "/network-instances/network-instance/protocols/protocol/name": "BGP", + "identifier": "BGP", + }, + map[string]interface{}{ + "session_state": "ESTABLISHED", + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests { @@ -544,7 +702,7 @@ func TestSubscribeResponseError(t *testing.T) { plugin := &GNMI{Log: ml} // TODO: FIX SA1019: gnmi.Error is deprecated: Do not use. errorResponse := &gnmiLib.SubscribeResponse_Error{Error: &gnmiLib.Error{Message: me, Code: mc}} - plugin.handleSubscribeResponse("127.0.0.1:0", &gnmiLib.SubscribeResponse{Response: errorResponse}) + plugin.handleSubscribeResponse(&Worker{address: "127.0.0.1:0"}, &gnmiLib.SubscribeResponse{Response: errorResponse}) require.NotEmpty(t, ml.lastFormat) require.Equal(t, []interface{}{mc, me}, ml.lastArgs) } @@ -615,3 +773,194 @@ func TestRedial(t *testing.T) { grpcServer.Stop() wg.Wait() } + +func TestTagNode(t *testing.T) { + type insertOp struct { + keys []*gnmiLib.PathElem + name string + value *gnmiLib.TypedValue + } + interfaceElemSingleKey := &gnmiLib.PathElem{ + Name: "interface", + Key: map[string]string{"name": "Management0"}, + } + networkInstanceSingleKey := &gnmiLib.PathElem{ + Name: "network-instance", + Key: map[string]string{"name": "default"}, + } + protocolDoubleKey := &gnmiLib.PathElem{ + Name: "protocol", + Key: map[string]string{"name": "BGP", "protocol": "BGP"}, + } + neighborSingleKey := &gnmiLib.PathElem{ + Name: "neighbor", + Key: map[string]string{"neighbor_address": "192.0.2.1"}, + } + tests := []struct { + name string + insertOps []insertOp + expected *tagNode + }{ + { + name: "single elem single key insert", + insertOps: []insertOp{ + { + keys: []*gnmiLib.PathElem{interfaceElemSingleKey}, + name: "tagFoo", + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}}, + }, + }, + expected: &tagNode{ + tagStore: map[string][]*tagNode{ + "interface": { + { + elem: interfaceElemSingleKey, + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}}, + tagName: "tagFoo", + }, + }, + }, + }, + }, + { + name: "double elem single key insert", + insertOps: []insertOp{ + { + keys: []*gnmiLib.PathElem{interfaceElemSingleKey, networkInstanceSingleKey}, + name: "tagBar", + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}}, + }, + }, + expected: &tagNode{ + tagStore: map[string][]*tagNode{ + "interface": { + { + elem: interfaceElemSingleKey, + tagStore: map[string][]*tagNode{ + "network-instance": { + { + elem: networkInstanceSingleKey, + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}}, + tagName: "tagBar", + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "single elem double key insert", + insertOps: []insertOp{ + { + keys: []*gnmiLib.PathElem{protocolDoubleKey}, + name: "doubleKey", + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}}, + }, + }, + expected: &tagNode{ + tagStore: map[string][]*tagNode{ + "protocol": { + { + elem: protocolDoubleKey, + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}}, + tagName: "doubleKey", + }, + }, + }, + }, + }, + { + name: "multi elem unrelated insert", + insertOps: []insertOp{ + { + keys: []*gnmiLib.PathElem{interfaceElemSingleKey}, + name: "intf_desc", + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}}, + }, + { + keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey, neighborSingleKey}, + name: "bgp_neigh_desc", + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}}, + }, + }, + expected: &tagNode{ + tagStore: map[string][]*tagNode{ + "interface": { + { + elem: interfaceElemSingleKey, + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}}, + tagName: "intf_desc", + }, + }, + "network-instance": { + { + elem: networkInstanceSingleKey, + tagStore: map[string][]*tagNode{ + "protocol": { + { + elem: protocolDoubleKey, + tagStore: map[string][]*tagNode{ + "neighbor": { + { + elem: neighborSingleKey, + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}}, + tagName: "bgp_neigh_desc", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "values at multiple levels", + insertOps: []insertOp{ + { + keys: []*gnmiLib.PathElem{networkInstanceSingleKey}, + name: "vrf_stuff", + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}}, + }, + { + keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey}, + name: "protocol_stuff", + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}}, + }, + }, + expected: &tagNode{ + tagStore: map[string][]*tagNode{ + "network-instance": { + { + elem: networkInstanceSingleKey, + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}}, + tagName: "vrf_stuff", + tagStore: map[string][]*tagNode{ + "protocol": { + { + elem: protocolDoubleKey, + value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}}, + tagName: "protocol_stuff", + }, + }, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rootNode := new(tagNode) + for _, s := range tt.insertOps { + rootNode.insert(s.keys, s.name, s.value) + } + require.Equal(t, rootNode, tt.expected) + }) + } +}