diff --git a/plugins/inputs/gnmi/README.md b/plugins/inputs/gnmi/README.md index 6dc94b786..45f30f86f 100644 --- a/plugins/inputs/gnmi/README.md +++ b/plugins/inputs/gnmi/README.md @@ -91,14 +91,29 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # [[inputs.gnmi.tag_subscription]] # ## When applying this value as a tag to other metrics, use this tag name # name = "descr" + # # ## All other subscription fields are as normal # origin = "openconfig-interfaces" # path = "/interfaces/interface/state" # subscription_mode = "on_change" - # ## At least one path element name must be supplied that contains at least - # ## one key to match on. Multiple element names can be specified in any - # ## order. In this case all element names must be present. - # elements = ["description", "interface"] + # + # ## Match strategy to use for the tag. + # ## Tags are only applied for metrics of the same address. The following + # ## settings are valid: + # ## unconditional -- always match + # ## name -- match by the "name" key + # ## This resembles the previsou 'tag-only' behavior. + # ## elements -- match by the keys in the path filtered by the path + # ## parts specified `elements` below + # ## By default, 'elements' is used if the 'elements' option is provided, + # ## otherwise match by 'name'. + # # match = "" + # + # ## For the 'elements' match strategy, at least one path-element name must + # ## be supplied containing at least one key to match on. Multiple path + # ## elements can be specified in any order. All given keys must be equal + # ## for a match. + # # elements = ["description", "interface"] ``` ## Metrics diff --git a/plugins/inputs/gnmi/gnmi.go b/plugins/inputs/gnmi/gnmi.go index a1401b595..124d23c8e 100644 --- a/plugins/inputs/gnmi/gnmi.go +++ b/plugins/inputs/gnmi/gnmi.go @@ -2,15 +2,10 @@ package gnmi import ( - "bytes" "context" "crypto/tls" _ "embed" - "encoding/json" "fmt" - "io" - "math" - "net" "path" "regexp" "strings" @@ -19,17 +14,12 @@ import ( "github.com/google/gnxi/utils/xpath" gnmiLib "github.com/openconfig/gnmi/proto/gnmi" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/metric" internaltls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" - jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) //go:embed sample.conf @@ -52,52 +42,25 @@ type GNMI struct { Subscriptions []Subscription `toml:"subscription"` TagSubscriptions []TagSubscription `toml:"tag_subscription"` Aliases map[string]string `toml:"aliases"` + Encoding string `toml:"encoding"` + Origin string `toml:"origin"` + Prefix string `toml:"prefix"` + Target string `toml:"target"` + UpdatesOnly bool `toml:"updates_only"` + Username string `toml:"username"` + Password string `toml:"password"` + Redial config.Duration `toml:"redial"` MaxMsgSize config.Size `toml:"max_msg_size"` - - // Optional subscription configuration - Encoding string - Origin string - Prefix string - Target string - UpdatesOnly bool `toml:"updates_only"` - - // gNMI target credentials - Username string - Password string - - // Redial - Redial config.Duration - - // GRPC TLS settings - EnableTLS bool `toml:"enable_tls"` + Trace bool `toml:"dump_responses"` + EnableTLS bool `toml:"enable_tls"` + Log telegraf.Logger `toml:"-"` internaltls.ClientConfig // Internal state - internalAliases map[string]string - acc telegraf.Accumulator - cancel context.CancelFunc - wg sync.WaitGroup - legacyTags bool - emptyNameWarnShown bool - - Log telegraf.Logger -} - -type Worker struct { - address string - tagStore *tagNode -} - -type tagNode struct { - elem *gnmiLib.PathElem - tagName string - value *gnmiLib.TypedValue - tagStore map[string][]*tagNode -} - -type tagResults struct { - names []string - values []*gnmiLib.TypedValue + internalAliases map[string]string + acc telegraf.Accumulator + cancel context.CancelFunc + wg sync.WaitGroup } // Subscription for a gNMI client @@ -123,6 +86,7 @@ type Subscription struct { // Tag Subscription for a gNMI client type TagSubscription struct { Subscription + Match string `toml:"match"` Elements []string } @@ -141,13 +105,15 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { for i := len(c.Subscriptions) - 1; i >= 0; i-- { subscription := c.Subscriptions[i] - // Support legacy TagOnly subscriptions + // Support and convert legacy TagOnly subscriptions if subscription.TagOnly { - tagSub := convertTagOnlySubscription(subscription) + tagSub := TagSubscription{ + Subscription: subscription, + Match: "name", + } c.TagSubscriptions = append(c.TagSubscriptions, tagSub) // Remove from the original subscriptions list c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...) - c.legacyTags = true continue } if err = subscription.buildFullPath(c); err != nil { @@ -161,8 +127,21 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly { return fmt.Errorf("do not mix legacy tag_only subscriptions and tag subscriptions") } - if len(c.TagSubscriptions[idx].Elements) == 0 { - return fmt.Errorf("tag_subscription must have at least one element") + switch c.TagSubscriptions[idx].Match { + case "": + if len(c.TagSubscriptions[idx].Elements) > 0 { + c.TagSubscriptions[idx].Match = "elements" + } else { + c.TagSubscriptions[idx].Match = "name" + } + case "unconditional": + case "name": + case "elements": + if len(c.TagSubscriptions[idx].Elements) == 0 { + return fmt.Errorf("tag_subscription must have at least one element") + } + default: + return fmt.Errorf("unknown match type %q for tag-subscription %q", c.TagSubscriptions[idx].Match, c.TagSubscriptions[idx].Name) } } @@ -206,12 +185,11 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { // Create a goroutine for each device, dial and subscribe c.wg.Add(len(c.Addresses)) for _, addr := range c.Addresses { - worker := Worker{address: addr} - worker.tagStore = &tagNode{} - go func(worker Worker) { + go func(addr string) { defer c.wg.Done() + h := newHandler(addr, c.internalAliases, c.TagSubscriptions, int(c.MaxMsgSize), c.Log, c.Trace) for ctx.Err() == nil { - if err := c.subscribeGNMI(ctx, &worker, tlscfg, request); err != nil && ctx.Err() == nil { + if err := h.subscribeGNMI(ctx, acc, tlscfg, request); err != nil && ctx.Err() == nil { acc.AddError(err) } @@ -220,7 +198,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { case <-time.After(time.Duration(c.Redial)): } } - }(worker) + }(addr) } return nil } @@ -291,239 +269,6 @@ func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) { }, nil } -// SubscribeGNMI and extract telemetry data -func (c *GNMI) subscribeGNMI(ctx context.Context, worker *Worker, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { - var creds credentials.TransportCredentials - if tlscfg != nil { - creds = credentials.NewTLS(tlscfg) - } else { - creds = insecure.NewCredentials() - } - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(creds), - } - - if c.MaxMsgSize > 0 { - opts = append(opts, grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(int(c.MaxMsgSize)), - )) - } - - client, err := grpc.DialContext(ctx, worker.address, opts...) - if err != nil { - return fmt.Errorf("failed to dial: %v", err) - } - defer client.Close() - - subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx) - if err != nil { - return fmt.Errorf("failed to setup subscription: %v", err) - } - - if err = subscribeClient.Send(request); err != nil { - // If io.EOF is returned, the stream may have ended and stream status - // can be determined by calling Recv. - if err != io.EOF { - return fmt.Errorf("failed to send subscription request: %v", err) - } - } - - c.Log.Debugf("Connection to gNMI device %s established", worker.address) - defer c.Log.Debugf("Connection to gNMI device %s closed", worker.address) - for ctx.Err() == nil { - var reply *gnmiLib.SubscribeResponse - if reply, err = subscribeClient.Recv(); err != nil { - if err != io.EOF && ctx.Err() == nil { - return fmt.Errorf("aborted gNMI subscription: %v", err) - } - break - } - - c.handleSubscribeResponse(worker, reply) - } - return nil -} - -func (c *GNMI) handleSubscribeResponse(worker *Worker, reply *gnmiLib.SubscribeResponse) { - if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok { - c.handleSubscribeResponseUpdate(worker, response) - } -} - -// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data -func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.SubscribeResponse_Update) { - var prefix, prefixAliasPath string - grouper := metric.NewSeriesGrouper() - timestamp := time.Unix(0, response.Update.Timestamp) - prefixTags := make(map[string]string) - - if response.Update.Prefix != nil { - var err error - if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, c.internalAliases, ""); err != nil { - c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err) - } - } - - prefixTags["source"], _, _ = net.SplitHostPort(worker.address) - prefixTags["path"] = prefix - - // Process and remove tag-only updates from the response - for i := len(response.Update.Update) - 1; i >= 0; i-- { - update := response.Update.Update[i] - fullPath := pathWithPrefix(response.Update.Prefix, update.Path) - for _, tagSub := range c.TagSubscriptions { - if equalPathNoKeys(fullPath, tagSub.fullPath) { - c.Log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update) - worker.storeTags(update, tagSub) - response.Update.Update = append(response.Update.Update[:i], response.Update.Update[i+1:]...) - } - } - } - - // Parse individual Update message and create measurements - var name, lastAliasPath string - for _, update := range response.Update.Update { - fullPath := pathWithPrefix(response.Update.Prefix, update.Path) - - // Prepare tags from prefix - tags := make(map[string]string, len(prefixTags)) - for key, val := range prefixTags { - tags[key] = val - } - aliasPath, fields := c.handleTelemetryField(update, tags, prefix) - - if tagOnlyTags := worker.checkTags(fullPath); tagOnlyTags != nil { - for k, v := range tagOnlyTags { - if alias, ok := c.internalAliases[k]; ok { - tags[alias] = fmt.Sprint(v) - } else { - tags[k] = fmt.Sprint(v) - } - } - } - - // Inherent valid alias from prefix parsing - if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { - aliasPath = prefixAliasPath - } - - // Lookup alias if alias-path has changed - if aliasPath != lastAliasPath { - name = prefix - if alias, ok := c.internalAliases[aliasPath]; ok { - name = alias - } else { - c.Log.Debugf("No measurement alias for gNMI path: %s", name) - } - } - - // Check for empty names - if name == "" && !c.emptyNameWarnShown { - c.Log.Warnf(emptyNameWarning, response.Update) - c.emptyNameWarnShown = true - } - - // Group metrics - for k, v := range fields { - key := k - if len(aliasPath) < len(key) && len(aliasPath) != 0 { - // This may not be an exact prefix, due to naming style - // conversion on the key. - key = key[len(aliasPath)+1:] - } else if len(aliasPath) >= len(key) { - // Otherwise use the last path element as the field key. - key = path.Base(key) - - // If there are no elements skip the item; this would be an - // invalid message. - key = strings.TrimLeft(key, "/.") - if key == "" { - c.Log.Errorf("invalid empty path: %q", k) - continue - } - } - grouper.Add(name, tags, timestamp, key, v) - } - - lastAliasPath = aliasPath - } - - // Add grouped measurements - for _, metricToAdd := range grouper.Metrics() { - c.acc.AddMetric(metricToAdd) - } -} - -// HandleTelemetryField and add it to a measurement -func (c *GNMI) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { - gpath, aliasPath, err := handlePath(update.Path, tags, c.internalAliases, prefix) - if err != nil { - c.Log.Errorf("handling path %q failed: %v", update.Path, err) - } - fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val) - if err != nil { - c.Log.Errorf("error parsing update value %q: %v", update.Val, err) - } - return aliasPath, fields -} - -// Parse path to path-buffer and tag-field -func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) { - builder := bytes.NewBufferString(prefix) - - // Some devices do report the origin in the first path element - // so try to find out if this is the case. - if gnmiPath.Origin == "" && len(gnmiPath.Elem) > 0 { - groups := originPattern.FindStringSubmatch(gnmiPath.Elem[0].Name) - if len(groups) == 2 { - gnmiPath.Origin = groups[1] - gnmiPath.Elem[0].Name = gnmiPath.Elem[0].Name[len(groups[1])+1:] - } - } - - // Prefix with origin - if len(gnmiPath.Origin) > 0 { - if _, err := builder.WriteString(gnmiPath.Origin); err != nil { - return "", "", err - } - if _, err := builder.WriteRune(':'); err != nil { - return "", "", err - } - } - - // Parse generic keys from prefix - for _, elem := range gnmiPath.Elem { - if len(elem.Name) > 0 { - if _, err := builder.WriteRune('/'); err != nil { - return "", "", err - } - if _, err := builder.WriteString(elem.Name); err != nil { - return "", "", err - } - } - name := builder.String() - - if _, exists := aliases[name]; exists { - aliasPath = name - } - - if tags != nil { - for key, val := range elem.Key { - key = strings.ReplaceAll(key, "-", "_") - - // Use short-form of key if possible - if _, exists := tags[key]; exists { - tags[name+"/"+key] = val - } else { - tags[key] = val - } - } - } - } - - return builder.String(), aliasPath, nil -} - // ParsePath from XPath-like string to gNMI path structure func parsePath(origin string, pathToParse string, target string) (*gnmiLib.Path, error) { gnmiPath, err := xpath.ToGNMIPath(pathToParse) @@ -559,45 +304,6 @@ func init() { inputs.Add("cisco_telemetry_gnmi", New) } -func convertTagOnlySubscription(s Subscription) TagSubscription { - t := TagSubscription{Subscription: s, Elements: []string{"interface"}} - return t -} - -// equalPathNoKeys checks if two gNMI paths are equal, without keys -func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool { - if len(a.Elem) != len(b.Elem) { - return false - } - for i := range a.Elem { - if a.Elem[i].Name != b.Elem[i].Name { - return false - } - } - return true -} - -func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem { - var newPath []*gnmiLib.PathElem - for _, elem := range gpath.Elem { - if elem.Key != nil { - newPath = append(newPath, elem) - } - } - return newPath -} - -func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path { - if prefix == nil { - return gpath - } - fullPath := new(gnmiLib.Path) - fullPath.Origin = prefix.Origin - fullPath.Target = prefix.Target - fullPath.Elem = append(prefix.Elem, gpath.Elem...) - return fullPath -} - func (s *Subscription) buildFullPath(c *GNMI) error { var err error if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil { @@ -618,81 +324,6 @@ func (s *Subscription) buildFullPath(c *GNMI) error { return nil } -func (w *Worker) storeTags(update *gnmiLib.Update, sub TagSubscription) { - updateKeys := pathKeys(update.Path) - var foundKey bool - for _, requiredKey := range sub.Elements { - foundKey = false - for _, elem := range updateKeys { - if elem.Name == requiredKey { - foundKey = true - } - } - if !foundKey { - return - } - } - // All required keys present for this TagSubscription - w.tagStore.insert(updateKeys, sub.Name, update.Val) -} - -func (node *tagNode) insert(keys []*gnmiLib.PathElem, name string, value *gnmiLib.TypedValue) { - if len(keys) == 0 { - node.value = value - node.tagName = name - return - } - var found *tagNode - key := keys[0] - keyName := key.Name - if node.tagStore == nil { - node.tagStore = make(map[string][]*tagNode) - } - if _, ok := node.tagStore[keyName]; !ok { - node.tagStore[keyName] = make([]*tagNode, 0) - } - for _, node := range node.tagStore[keyName] { - if compareKeys(node.elem.Key, key.Key) { - found = node - break - } - } - if found == nil { - found = &tagNode{elem: keys[0]} - node.tagStore[keyName] = append(node.tagStore[keyName], found) - } - found.insert(keys[1:], name, value) -} - -func (node *tagNode) retrieve(keys []*gnmiLib.PathElem, tagResults *tagResults) { - if node.value != nil { - tagResults.names = append(tagResults.names, node.tagName) - tagResults.values = append(tagResults.values, node.value) - } - for _, key := range keys { - if elems, ok := node.tagStore[key.Name]; ok { - for _, node := range elems { - if compareKeys(node.elem.Key, key.Key) { - node.retrieve(keys, tagResults) - } - } - } - } -} - -func (w *Worker) checkTags(fullPath *gnmiLib.Path) map[string]interface{} { - results := &tagResults{} - w.tagStore.retrieve(pathKeys(fullPath), results) - tags := make(map[string]interface{}) - for idx := range results.names { - vals, _ := gnmiToFields(results.names[idx], results.values[idx]) - for k, v := range vals { - tags[k] = v - } - } - return tags -} - func (s *Subscription) buildAlias(aliases map[string]string) error { var err error var gnmiLongPath, gnmiShortPath *gnmiLib.Path @@ -725,69 +356,3 @@ func (s *Subscription) buildAlias(aliases map[string]string) error { } return nil } - -func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) { - var value interface{} - var jsondata []byte - - // Make sure a value is actually set - if updateVal == nil || updateVal.Value == nil { - return nil, nil - } - - switch val := updateVal.Value.(type) { - case *gnmiLib.TypedValue_AsciiVal: - value = val.AsciiVal - case *gnmiLib.TypedValue_BoolVal: - value = val.BoolVal - case *gnmiLib.TypedValue_BytesVal: - value = val.BytesVal - case *gnmiLib.TypedValue_DoubleVal: - value = val.DoubleVal - case *gnmiLib.TypedValue_DecimalVal: - //nolint:staticcheck // to maintain backward compatibility with older gnmi specs - value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision)) - case *gnmiLib.TypedValue_FloatVal: - //nolint:staticcheck // to maintain backward compatibility with older gnmi specs - value = val.FloatVal - case *gnmiLib.TypedValue_IntVal: - value = val.IntVal - case *gnmiLib.TypedValue_StringVal: - value = val.StringVal - case *gnmiLib.TypedValue_UintVal: - value = val.UintVal - case *gnmiLib.TypedValue_JsonIetfVal: - jsondata = val.JsonIetfVal - case *gnmiLib.TypedValue_JsonVal: - jsondata = val.JsonVal - } - - fields := make(map[string]interface{}) - if value != nil { - fields[name] = value - } else if jsondata != nil { - if err := json.Unmarshal(jsondata, &value); err != nil { - return nil, fmt.Errorf("failed to parse JSON value: %v", err) - } - flattener := jsonparser.JSONFlattener{Fields: fields} - if err := flattener.FullFlattenJSON(name, value, true, true); err != nil { - return nil, fmt.Errorf("failed to flatten JSON: %v", err) - } - } - return fields, nil -} - -func compareKeys(a map[string]string, b map[string]string) bool { - if len(a) != len(b) { - return false - } - for k, v := range a { - if _, ok := b[k]; !ok { - return false - } - if b[k] != v { - return false - } - } - return true -} diff --git a/plugins/inputs/gnmi/gnmi_test.go b/plugins/inputs/gnmi/gnmi_test.go index df1680c94..38ad6afd6 100644 --- a/plugins/inputs/gnmi/gnmi_test.go +++ b/plugins/inputs/gnmi/gnmi_test.go @@ -2,9 +2,12 @@ package gnmi import ( "context" + "encoding/json" "errors" "fmt" "net" + "os" + "path/filepath" "sync" "testing" "time" @@ -13,9 +16,12 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/encoding/protojson" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -96,8 +102,9 @@ func TestWaitError(t *testing.T) { grpcServer.Stop() wg.Wait() - require.Contains(t, acc.Errors, - errors.New("aborted gNMI subscription: rpc error: code = Unknown desc = testerror")) + // Check if the expected error text is among the errors + require.Len(t, acc.Errors, 1) + require.ErrorContains(t, acc.Errors[0], "aborted gNMI subscription: rpc error: code = Unknown desc = testerror") } func TestUsernamePassword(t *testing.T) { @@ -154,8 +161,9 @@ func TestUsernamePassword(t *testing.T) { grpcServer.Stop() wg.Wait() - require.Contains(t, acc.Errors, - errors.New("aborted gNMI subscription: rpc error: code = Unknown desc = success")) + // Check if the expected error text is among the errors + require.Len(t, acc.Errors, 1) + require.ErrorContains(t, acc.Errors[0], "aborted gNMI subscription: rpc error: code = Unknown desc = success") } func mockGNMINotification() *gnmiLib.Notification { @@ -478,10 +486,9 @@ func TestNotification(t *testing.T) { testutil.MustMetric( "oc-intf-counters", map[string]string{ - "path": "", - "source": "127.0.0.1", - "name": "Ethernet1", - "oc-intf-desc": "foo", + "source": "127.0.0.1", + "name": "Ethernet1", + "oc-intf-desc/description": "foo", }, map[string]interface{}{ "in_broadcast_pkts": 42, @@ -626,6 +633,7 @@ func TestNotification(t *testing.T) { }, }, } + return server.Send(taggedResponse) }, }, @@ -633,11 +641,10 @@ func TestNotification(t *testing.T) { testutil.MustMetric( "oc-neigh-state", map[string]string{ - "path": "", - "source": "127.0.0.1", - "neighbor_address": "192.0.2.1", - "name": "default", - "oc-neigh-desc": "EXAMPLE-PEER", + "source": "127.0.0.1", + "neighbor_address": "192.0.2.1", + "name": "default", + "oc-neigh-desc/description": "EXAMPLE-PEER", "/network-instances/network-instance/protocols/protocol/name": "BGP", "identifier": "BGP", }, @@ -1011,193 +1018,128 @@ func TestRedial(t *testing.T) { wg.Wait() } -func TestTagNode(t *testing.T) { - type insertOp struct { - keys []*gnmiLib.PathElem - name string - value *gnmiLib.TypedValue - } - interfaceElemSingleKey := &gnmiLib.PathElem{ - Name: "interface", - Key: map[string]string{"name": "Management0"}, - } - networkInstanceSingleKey := &gnmiLib.PathElem{ - Name: "network-instance", - Key: map[string]string{"name": "default"}, - } - protocolDoubleKey := &gnmiLib.PathElem{ - Name: "protocol", - Key: map[string]string{"name": "BGP", "protocol": "BGP"}, - } - neighborSingleKey := &gnmiLib.PathElem{ - Name: "neighbor", - Key: map[string]string{"neighbor_address": "192.0.2.1"}, - } - tests := []struct { - name string - insertOps []insertOp - expected *tagNode - }{ - { - name: "single elem single key insert", - insertOps: []insertOp{ - { - keys: []*gnmiLib.PathElem{interfaceElemSingleKey}, - name: "tagFoo", - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}}, - }, - }, - expected: &tagNode{ - tagStore: map[string][]*tagNode{ - "interface": { - { - elem: interfaceElemSingleKey, - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}}, - tagName: "tagFoo", - }, - }, - }, - }, - }, - { - name: "double elem single key insert", - insertOps: []insertOp{ - { - keys: []*gnmiLib.PathElem{interfaceElemSingleKey, networkInstanceSingleKey}, - name: "tagBar", - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}}, - }, - }, - expected: &tagNode{ - tagStore: map[string][]*tagNode{ - "interface": { - { - elem: interfaceElemSingleKey, - tagStore: map[string][]*tagNode{ - "network-instance": { - { - elem: networkInstanceSingleKey, - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}}, - tagName: "tagBar", - }, - }, - }, - }, - }, - }, - }, - }, - { - name: "single elem double key insert", - insertOps: []insertOp{ - { - keys: []*gnmiLib.PathElem{protocolDoubleKey}, - name: "doubleKey", - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}}, - }, - }, - expected: &tagNode{ - tagStore: map[string][]*tagNode{ - "protocol": { - { - elem: protocolDoubleKey, - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}}, - tagName: "doubleKey", - }, - }, - }, - }, - }, - { - name: "multi elem unrelated insert", - insertOps: []insertOp{ - { - keys: []*gnmiLib.PathElem{interfaceElemSingleKey}, - name: "intf_desc", - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}}, - }, - { - keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey, neighborSingleKey}, - name: "bgp_neigh_desc", - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}}, - }, - }, - expected: &tagNode{ - tagStore: map[string][]*tagNode{ - "interface": { - { - elem: interfaceElemSingleKey, - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}}, - tagName: "intf_desc", - }, - }, - "network-instance": { - { - elem: networkInstanceSingleKey, - tagStore: map[string][]*tagNode{ - "protocol": { - { - elem: protocolDoubleKey, - tagStore: map[string][]*tagNode{ - "neighbor": { - { - elem: neighborSingleKey, - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}}, - tagName: "bgp_neigh_desc", - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - { - name: "values at multiple levels", - insertOps: []insertOp{ - { - keys: []*gnmiLib.PathElem{networkInstanceSingleKey}, - name: "vrf_stuff", - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}}, - }, - { - keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey}, - name: "protocol_stuff", - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}}, - }, - }, - expected: &tagNode{ - tagStore: map[string][]*tagNode{ - "network-instance": { - { - elem: networkInstanceSingleKey, - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}}, - tagName: "vrf_stuff", - tagStore: map[string][]*tagNode{ - "protocol": { - { - elem: protocolDoubleKey, - value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}}, - tagName: "protocol_stuff", - }, - }, - }, - }, - }, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - rootNode := new(tagNode) - for _, s := range tt.insertOps { - rootNode.insert(s.keys, s.name, s.value) +func TestCases(t *testing.T) { + // Get all testcase directories + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Register the plugin + inputs.Add("gnmi", New) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + configFilename := filepath.Join(testcasePath, "telegraf.conf") + inputFilename := filepath.Join(testcasePath, "responses.json") + expectedFilename := filepath.Join(testcasePath, "expected.out") + expectedErrorFilename := filepath.Join(testcasePath, "expected.err") + + // Load the input data + buf, err := os.ReadFile(inputFilename) + require.NoError(t, err) + var entries []json.RawMessage + require.NoError(t, json.Unmarshal(buf, &entries)) + responses := make([]gnmiLib.SubscribeResponse, len(entries)) + for i, entry := range entries { + require.NoError(t, protojson.Unmarshal(entry, &responses[i])) } - require.Equal(t, rootNode, tt.expected) + + // Prepare the influx parser for expectations + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + // Read the expected output if any + var expected []telegraf.Metric + if _, err := os.Stat(expectedFilename); err == nil { + var err error + expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser) + require.NoError(t, err) + } + + // Read the expected output if any + var expectedErrors []string + if _, err := os.Stat(expectedErrorFilename); err == nil { + var err error + expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename) + require.NoError(t, err) + require.NotEmpty(t, expectedErrors) + } + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.Len(t, cfg.Inputs, 1) + + // Prepare the server response + responseFunction := func(server gnmiLib.GNMI_SubscribeServer) error { + sync := &gnmiLib.SubscribeResponse{ + Response: &gnmiLib.SubscribeResponse_SyncResponse{ + SyncResponse: true, + }, + } + _ = sync + for i := range responses { + if err := server.Send(&responses[i]); err != nil { + return err + } + } + + return nil + } + + // Setup a mock server + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + grpcServer := grpc.NewServer() + gnmiServer := &MockServer{ + SubscribeF: responseFunction, + GRPCServer: grpcServer, + } + gnmiLib.RegisterGNMIServer(grpcServer, gnmiServer) + + // Setup the plugin + plugin := cfg.Inputs[0].Input.(*GNMI) + plugin.Addresses = []string{listener.Addr().String()} + plugin.Log = testutil.Logger{} + + // Start the server + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := grpcServer.Serve(listener) + require.NoError(t, err) + }() + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + + require.Eventually(t, + func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, 1*time.Second, 100*time.Millisecond) + plugin.Stop() + grpcServer.Stop() + wg.Wait() + + // Check for errors + require.Len(t, acc.Errors, len(expectedErrors)) + if len(acc.Errors) > 0 { + var actualErrorMsgs []string + for _, err := range acc.Errors { + actualErrorMsgs = append(actualErrorMsgs, err.Error()) + } + require.ElementsMatch(t, actualErrorMsgs, expectedErrors) + } + + // Check the metric nevertheless as we might get some metrics despite errors. + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) }) } } diff --git a/plugins/inputs/gnmi/handler.go b/plugins/inputs/gnmi/handler.go new file mode 100644 index 000000000..1b5393f28 --- /dev/null +++ b/plugins/inputs/gnmi/handler.go @@ -0,0 +1,238 @@ +package gnmi + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "path" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + gnmiLib "github.com/openconfig/gnmi/proto/gnmi" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/encoding/protojson" +) + +type handler struct { + address string + aliases map[string]string + tagsubs []TagSubscription + maxMsgSize int + emptyNameWarnShown bool + tagStore *tagStore + trace bool + log telegraf.Logger +} + +func newHandler(addr string, aliases map[string]string, subs []TagSubscription, maxsize int, l telegraf.Logger, trace bool) *handler { + return &handler{ + address: addr, + aliases: aliases, + tagsubs: subs, + maxMsgSize: maxsize, + tagStore: newTagStore(subs), + trace: trace, + log: l, + } +} + +// SubscribeGNMI and extract telemetry data +func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { + var creds credentials.TransportCredentials + if tlscfg != nil { + creds = credentials.NewTLS(tlscfg) + } else { + creds = insecure.NewCredentials() + } + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(creds), + } + + if h.maxMsgSize > 0 { + opts = append(opts, grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(h.maxMsgSize), + )) + } + + client, err := grpc.DialContext(ctx, h.address, opts...) + if err != nil { + return fmt.Errorf("failed to dial: %v", err) + } + defer client.Close() + + subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx) + if err != nil { + return fmt.Errorf("failed to setup subscription: %w", err) + } + + // If io.EOF is returned, the stream may have ended and stream status + // can be determined by calling Recv. + if err := subscribeClient.Send(request); err != nil && err != io.EOF { + return fmt.Errorf("failed to send subscription request: %w", err) + } + + h.log.Debugf("Connection to gNMI device %s established", h.address) + defer h.log.Debugf("Connection to gNMI device %s closed", h.address) + for ctx.Err() == nil { + var reply *gnmiLib.SubscribeResponse + if reply, err = subscribeClient.Recv(); err != nil { + if err != io.EOF && ctx.Err() == nil { + return fmt.Errorf("aborted gNMI subscription: %w", err) + } + break + } + + if h.trace { + buf, err := protojson.Marshal(reply) + if err != nil { + h.log.Debugf("marshal failed: %v", err) + } else { + t := reply.GetUpdate().GetTimestamp() + h.log.Debugf("update_%v: %s", t, string(buf)) + } + } + + if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok { + h.handleSubscribeResponseUpdate(acc, response) + } + } + return nil +} + +// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data +func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, response *gnmiLib.SubscribeResponse_Update) { + var prefix, prefixAliasPath string + grouper := metric.NewSeriesGrouper() + timestamp := time.Unix(0, response.Update.Timestamp) + prefixTags := make(map[string]string) + + if response.Update.Prefix != nil { + var err error + if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, h.aliases, ""); err != nil { + h.log.Errorf("handling path %q failed: %v", response.Update.Prefix, err) + } + } + + prefixTags["source"], _, _ = net.SplitHostPort(h.address) + if prefix != "" { + prefixTags["path"] = prefix + } + + // Process and remove tag-updates from the response first so we will + // add all available tags to the metrics later. + var valueUpdates []*gnmiLib.Update + for _, update := range response.Update.Update { + fullPath := pathWithPrefix(response.Update.Prefix, update.Path) + + // Prepare tags from prefix + tags := make(map[string]string, len(prefixTags)) + for key, val := range prefixTags { + tags[key] = val + } + + _, fields := h.handleTelemetryField(update, tags, prefix) + + var tagUpdate bool + for _, tagSub := range h.tagsubs { + if !equalPathNoKeys(fullPath, tagSub.fullPath) { + continue + } + h.log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update) + if err := h.tagStore.insert(tagSub, fullPath, fields, tags); err != nil { + h.log.Errorf("inserting tag failed: %w", err) + } + tagUpdate = true + break + } + if !tagUpdate { + valueUpdates = append(valueUpdates, update) + } + } + + // Parse individual Update message and create measurements + var name, lastAliasPath string + for _, update := range valueUpdates { + fullPath := pathWithPrefix(response.Update.Prefix, update.Path) + + // Prepare tags from prefix + tags := make(map[string]string, len(prefixTags)) + for key, val := range prefixTags { + tags[key] = val + } + + aliasPath, fields := h.handleTelemetryField(update, tags, prefix) + + // Add the tags derived via tag-subscriptions + for k, v := range h.tagStore.lookup(fullPath, tags) { + tags[k] = v + } + + // Inherent valid alias from prefix parsing + if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { + aliasPath = prefixAliasPath + } + + // Lookup alias if alias-path has changed + if aliasPath != lastAliasPath { + name = prefix + if alias, ok := h.aliases[aliasPath]; ok { + name = alias + } else { + h.log.Debugf("No measurement alias for gNMI path: %s", name) + } + lastAliasPath = aliasPath + } + + // Check for empty names + if name == "" && !h.emptyNameWarnShown { + h.log.Warnf(emptyNameWarning, response.Update) + h.emptyNameWarnShown = true + } + + // Group metrics + for k, v := range fields { + key := k + if len(aliasPath) < len(key) && len(aliasPath) != 0 { + // This may not be an exact prefix, due to naming style + // conversion on the key. + key = key[len(aliasPath)+1:] + } else if len(aliasPath) >= len(key) { + // Otherwise use the last path element as the field key. + key = path.Base(key) + + // If there are no elements skip the item; this would be an + // invalid message. + key = strings.TrimLeft(key, "/.") + if key == "" { + h.log.Errorf("invalid empty path: %q", k) + continue + } + } + grouper.Add(name, tags, timestamp, key, v) + } + } + + // Add grouped measurements + for _, metricToAdd := range grouper.Metrics() { + acc.AddMetric(metricToAdd) + } +} + +// HandleTelemetryField and add it to a measurement +func (h *handler) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { + gpath, aliasPath, err := handlePath(update.Path, tags, h.aliases, prefix) + if err != nil { + h.log.Errorf("handling path %q failed: %v", update.Path, err) + } + fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val) + if err != nil { + h.log.Errorf("error parsing update value %q: %v", update.Val, err) + } + return aliasPath, fields +} diff --git a/plugins/inputs/gnmi/sample.conf b/plugins/inputs/gnmi/sample.conf index 8d043b000..cc8ade872 100644 --- a/plugins/inputs/gnmi/sample.conf +++ b/plugins/inputs/gnmi/sample.conf @@ -66,11 +66,26 @@ # [[inputs.gnmi.tag_subscription]] # ## When applying this value as a tag to other metrics, use this tag name # name = "descr" + # # ## All other subscription fields are as normal # origin = "openconfig-interfaces" # path = "/interfaces/interface/state" # subscription_mode = "on_change" - # ## At least one path element name must be supplied that contains at least - # ## one key to match on. Multiple element names can be specified in any - # ## order. In this case all element names must be present. - # elements = ["description", "interface"] + # + # ## Match strategy to use for the tag. + # ## Tags are only applied for metrics of the same address. The following + # ## settings are valid: + # ## unconditional -- always match + # ## name -- match by the "name" key + # ## This resembles the previsou 'tag-only' behavior. + # ## elements -- match by the keys in the path filtered by the path + # ## parts specified `elements` below + # ## By default, 'elements' is used if the 'elements' option is provided, + # ## otherwise match by 'name'. + # # match = "" + # + # ## For the 'elements' match strategy, at least one path-element name must + # ## be supplied containing at least one key to match on. Multiple path + # ## elements can be specified in any order. All given keys must be equal + # ## for a match. + # # elements = ["description", "interface"] diff --git a/plugins/inputs/gnmi/tag_store.go b/plugins/inputs/gnmi/tag_store.go new file mode 100644 index 000000000..a1b2a663c --- /dev/null +++ b/plugins/inputs/gnmi/tag_store.go @@ -0,0 +1,177 @@ +package gnmi + +import ( + "fmt" + "path/filepath" + "sort" + "strings" + + "github.com/influxdata/telegraf/internal" + gnmiLib "github.com/openconfig/gnmi/proto/gnmi" +) + +type tagStore struct { + unconditional map[string]string + names map[string]map[string]string + elements elementsStore +} + +type elementsStore struct { + required [][]string + tags map[string]map[string]string +} + +func newTagStore(subs []TagSubscription) *tagStore { + store := tagStore{ + unconditional: make(map[string]string), + names: make(map[string]map[string]string), + elements: elementsStore{ + required: make([][]string, 0, len(subs)), + tags: make(map[string]map[string]string), + }, + } + for _, s := range subs { + if s.Match == "elements" { + store.elements.required = append(store.elements.required, s.Elements) + } + } + + return &store +} + +// Store tags extracted from TagSubscriptions +func (s *tagStore) insert(subscription TagSubscription, path *gnmiLib.Path, values map[string]interface{}, tags map[string]string) error { + switch subscription.Match { + case "unconditional": + for k, v := range values { + tagName := subscription.Name + "/" + filepath.Base(k) + sv, err := internal.ToString(v) + if err != nil { + return fmt.Errorf("conversion error for %v: %w", v, err) + } + if sv == "" { + delete(s.unconditional, tagName) + } else { + s.unconditional[tagName] = sv + } + } + case "name": + // Get the lookup key + key, found := tags["name"] + if !found { + return nil + } + + // Make sure we have a valid map for the key + if _, exists := s.names[key]; !exists { + s.names[key] = make(map[string]string) + } + + // Add the values + for k, v := range values { + tagName := subscription.Name + "/" + filepath.Base(k) + sv, err := internal.ToString(v) + if err != nil { + return fmt.Errorf("conversion error for %v: %w", v, err) + } + if sv == "" { + delete(s.names[key], tagName) + } else { + s.names[key][tagName] = sv + } + } + case "elements": + key, match := s.getElementsKeys(path, subscription.Elements) + if !match || len(values) == 0 { + return nil + } + + // Make sure we have a valid map for the key + if _, exists := s.elements.tags[key]; !exists { + s.elements.tags[key] = make(map[string]string) + } + + // Add the values + for k, v := range values { + tagName := subscription.Name + "/" + filepath.Base(k) + sv, err := internal.ToString(v) + if err != nil { + return fmt.Errorf("conversion error for %v: %w", v, err) + } + if sv == "" { + delete(s.elements.tags[key], tagName) + } else { + s.elements.tags[key][tagName] = sv + } + } + default: + return fmt.Errorf("unknown match strategy %q", subscription.Match) + } + + return nil +} + +func (s *tagStore) lookup(path *gnmiLib.Path, metricTags map[string]string) map[string]string { + // Add all unconditional tags + tags := make(map[string]string, len(s.unconditional)) + for k, v := range s.unconditional { + tags[k] = v + } + + // Match names + key, found := metricTags["name"] + if found { + for k, v := range s.names[key] { + tags[k] = v + } + } + + // Match elements + for _, requiredKeys := range s.elements.required { + key, match := s.getElementsKeys(path, requiredKeys) + if !match { + continue + } + for k, v := range s.elements.tags[key] { + tags[k] = v + } + } + + return tags +} + +func (s *tagStore) getElementsKeys(path *gnmiLib.Path, elements []string) (string, bool) { + keyElements := pathKeys(path) + + // Search for the required path elements and collect a ordered + // list of their values to in the form + // elementName1={keyA=valueA,keyB=valueB,...},...,elementNameN={keyY=valueY,keyZ=valueZ} + // where each elements' key-value list is enclosed in curly brackets. + keyParts := make([]string, 0, len(elements)) + for _, requiredElement := range elements { + var found bool + var elementKVs []string + for _, el := range keyElements { + if el.Name == requiredElement { + for k, v := range el.Key { + elementKVs = append(elementKVs, k+"="+v) + } + found = true + break + } + } + + // The element was not found, but all must match + if !found { + return "", false + } + + // We need to order the element's key-value pairs as the map + // returns elements in random order + sort.Strings(elementKVs) + + // Collect the element + keyParts = append(keyParts, requiredElement+"={"+strings.Join(elementKVs, ",")+"}") + } + return strings.Join(keyParts, ","), true +} diff --git a/plugins/inputs/gnmi/testcases/issue_11011/expected.out b/plugins/inputs/gnmi/testcases/issue_11011/expected.out new file mode 100644 index 000000000..37654befc --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11011/expected.out @@ -0,0 +1 @@ +oc-neigh-state,source=127.0.0.1,neighbor_address=192.0.2.1,name=default,oc-neigh-desc/description=EXAMPLE-PEER,/network-instances/network-instance/protocols/protocol/name=BGP,identifier=BGP session_state="ESTABLISHED" 1543236572000000000 diff --git a/plugins/inputs/gnmi/testcases/issue_11011/responses.json b/plugins/inputs/gnmi/testcases/issue_11011/responses.json new file mode 100644 index 000000000..d8b36705c --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11011/responses.json @@ -0,0 +1,116 @@ +[ + { + "update": { + "timestamp": "1543236571000000000", + "prefix": {}, + "update": [ + { + "path": { + "elem": [ + { + "name": "network-instances" + }, + { + "name": "network-instance", + "key": { + "name": "default" + } + }, + { + "name": "protocols" + }, + { + "name": "protocol", + "key": { + "identifier": "BGP", + "name": "BGP" + } + }, + { + "name": "bgp" + }, + { + "name": "neighbors" + }, + { + "name": "neighbor", + "key": { + "neighbor_address": "192.0.2.1" + } + }, + { + "name": "state" + }, + { + "name": "description" + } + ] + }, + "val": { + "stringVal": "EXAMPLE-PEER" + } + } + ] + } + }, + { + "syncResponse": true + }, + { + "update": { + "timestamp": "1543236572000000000", + "prefix": {}, + "update": [ + { + "path": { + "elem": [ + { + "name": "network-instances" + }, + { + "name": "network-instance", + "key": { + "name": "default" + } + }, + { + "name": "protocols" + }, + { + "name": "protocol", + "key": { + "identifier": "BGP", + "name": "BGP" + } + }, + { + "name": "bgp" + }, + { + "name": "neighbors" + }, + { + "name": "neighbor", + "key": { + "neighbor_address": "192.0.2.1" + } + }, + { + "name": "state" + }, + { + "name": "session-state" + } + ] + }, + "val": { + "stringVal": "ESTABLISHED" + } + } + ] + } + }, + { + "syncResponse": true + } +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_11011/telegraf.conf b/plugins/inputs/gnmi/testcases/issue_11011/telegraf.conf new file mode 100644 index 000000000..c59b9e667 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11011/telegraf.conf @@ -0,0 +1,14 @@ +[[inputs.gnmi]] + addresses = ["127.0.0.1"] + [[inputs.gnmi.subscription]] + name = "oc-neigh-state" + origin = "openconfig" + path = "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/session-state" + subscription_mode = "sample" + sample_interval = "10s" + [[inputs.gnmi.tag_subscription]] + name = "oc-neigh-desc" + origin = "openconfig" + path = "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/description" + subscription_mode = "on_change" + elements = ["network-instance", "protocol", "neighbor"] diff --git a/plugins/inputs/gnmi/testcases/issue_11778/expected.out b/plugins/inputs/gnmi/testcases/issue_11778/expected.out new file mode 100644 index 000000000..0bbefdf36 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11778/expected.out @@ -0,0 +1 @@ +ifcounters,path=openconfig-interfaces:/interfaces/interface/state,name=eth42,descr/description=eth42,source=127.0.0.1 counters=5678i 1673608605875353770 \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_11778/responses.json b/plugins/inputs/gnmi/testcases/issue_11778/responses.json new file mode 100644 index 000000000..9e9e59677 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11778/responses.json @@ -0,0 +1,49 @@ +[ + { + "update": { + "timestamp": "1673608605875353770", + "prefix": { + "origin": "openconfig-interfaces", + "elem": [ + { + "name": "interfaces" + }, + { + "name": "interface", + "key":{"name":"eth42"} + }, + { + "name": "state" + } + ], + "target": "subscription" + }, + "update": [ + { + "path": { + "elem": [ + { + "name": "counters" + } + ] + }, + "val": { + "intVal": "5678" + } + }, + { + "path": { + "elem": [ + { + "name": "description" + } + ] + }, + "val": { + "stringVal": "eth42" + } + } + ] + } + } +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_11778/telegraf.conf b/plugins/inputs/gnmi/testcases/issue_11778/telegraf.conf new file mode 100644 index 000000000..486cfe80d --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11778/telegraf.conf @@ -0,0 +1,15 @@ +[[inputs.gnmi]] + addresses = ["dummy"] + name_override = "gnmi" + redial = "10s" + [[inputs.gnmi.subscription]] + name = "ifcounters" + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/counters" + subscription_mode = "sample" + sample_interval = "10s" + [[inputs.gnmi.tag_subscription]] + name = "descr" + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/description" + subscription_mode = "on_change" diff --git a/plugins/inputs/gnmi/testcases/issue_11778_tag_only/expected.out b/plugins/inputs/gnmi/testcases/issue_11778_tag_only/expected.out new file mode 100644 index 000000000..0bbefdf36 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11778_tag_only/expected.out @@ -0,0 +1 @@ +ifcounters,path=openconfig-interfaces:/interfaces/interface/state,name=eth42,descr/description=eth42,source=127.0.0.1 counters=5678i 1673608605875353770 \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_11778_tag_only/responses.json b/plugins/inputs/gnmi/testcases/issue_11778_tag_only/responses.json new file mode 100644 index 000000000..9e9e59677 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11778_tag_only/responses.json @@ -0,0 +1,49 @@ +[ + { + "update": { + "timestamp": "1673608605875353770", + "prefix": { + "origin": "openconfig-interfaces", + "elem": [ + { + "name": "interfaces" + }, + { + "name": "interface", + "key":{"name":"eth42"} + }, + { + "name": "state" + } + ], + "target": "subscription" + }, + "update": [ + { + "path": { + "elem": [ + { + "name": "counters" + } + ] + }, + "val": { + "intVal": "5678" + } + }, + { + "path": { + "elem": [ + { + "name": "description" + } + ] + }, + "val": { + "stringVal": "eth42" + } + } + ] + } + } +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_11778_tag_only/telegraf.conf b/plugins/inputs/gnmi/testcases/issue_11778_tag_only/telegraf.conf new file mode 100644 index 000000000..597d720b8 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_11778_tag_only/telegraf.conf @@ -0,0 +1,16 @@ +[[inputs.gnmi]] + addresses = ["rt-01:nnnnn", "rt-02:nnnnn"] + name_override = "gnmi" + redial = "10s" + [[inputs.gnmi.subscription]] + name = "ifcounters" + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/counters" + subscription_mode = "sample" + sample_interval = "10s" + [[inputs.gnmi.subscription]] + name = "descr" + origin = "openconfig-interfaces" + path = "/interfaces/interface/state/description" + subscription_mode = "on_change" + tag_only = true diff --git a/plugins/inputs/gnmi/testcases/tagging_name_based/expected.out b/plugins/inputs/gnmi/testcases/tagging_name_based/expected.out new file mode 100644 index 000000000..40343f3fe --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_name_based/expected.out @@ -0,0 +1,5 @@ +interfaces_logical_status,descr/description=Loopback,index=0,name=lo0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667238697959 +interfaces_logical_status,descr/description=Local:Mgmt,index=312,name=irb,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667241404442 +interfaces_logical_status,descr/description=Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel},index=2,name=gr-0/0/0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667243155079 +interfaces_logical_status,descr/description=Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345},index=1410,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250570407 +interfaces_logical_status,descr/description=Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890},index=16386,name=xe-0/1/5,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="LOWER_LAYER_DOWN" 1674081667251795605 diff --git a/plugins/inputs/gnmi/testcases/tagging_name_based/responses.json b/plugins/inputs/gnmi/testcases/tagging_name_based/responses.json new file mode 100644 index 000000000..cec8fea82 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_name_based/responses.json @@ -0,0 +1,13 @@ +[ + {"update":{"timestamp":"1674081667224189253", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Loopback"}}]}}, + {"update":{"timestamp":"1674081667226968153", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Local:Mgmt"}}]}}, + {"update":{"timestamp":"1674081667228936729", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"3"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel}"}}]}}, + {"update":{"timestamp":"1674081667236178737", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345}"}}]}}, + {"update":{"timestamp":"1674081667236377628", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890}"}}]}}, + {"update":{"timestamp":"1674081667238697959", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667241404442", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667243155079", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"2"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667250570407", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667251795605", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"16386"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"LOWER_LAYER_DOWN"}}]}}, + {"syncResponse":true} +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/tagging_name_based/telegraf.conf b/plugins/inputs/gnmi/testcases/tagging_name_based/telegraf.conf new file mode 100644 index 000000000..774bae2e9 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_name_based/telegraf.conf @@ -0,0 +1,15 @@ +[[inputs.gnmi]] + addresses = ["dummy"] + redial = "10s" + + [[inputs.gnmi.tag_subscription]] + name = "descr" + origin = "openconfig" + path = "/interfaces/interface/subinterfaces/subinterface/state/description" + subscription_mode = "on_change" + + [[inputs.gnmi.subscription]] + name = "interfaces_logical_status" + origin = "openconfig" + path = "/interfaces/interface/subinterfaces/subinterface/state/oper-status" + subscription_mode = "on_change" \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/tagging_name_based_old/expected.out b/plugins/inputs/gnmi/testcases/tagging_name_based_old/expected.out new file mode 100644 index 000000000..40343f3fe --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_name_based_old/expected.out @@ -0,0 +1,5 @@ +interfaces_logical_status,descr/description=Loopback,index=0,name=lo0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667238697959 +interfaces_logical_status,descr/description=Local:Mgmt,index=312,name=irb,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667241404442 +interfaces_logical_status,descr/description=Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel},index=2,name=gr-0/0/0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667243155079 +interfaces_logical_status,descr/description=Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345},index=1410,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250570407 +interfaces_logical_status,descr/description=Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890},index=16386,name=xe-0/1/5,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="LOWER_LAYER_DOWN" 1674081667251795605 diff --git a/plugins/inputs/gnmi/testcases/tagging_name_based_old/responses.json b/plugins/inputs/gnmi/testcases/tagging_name_based_old/responses.json new file mode 100644 index 000000000..cec8fea82 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_name_based_old/responses.json @@ -0,0 +1,13 @@ +[ + {"update":{"timestamp":"1674081667224189253", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Loopback"}}]}}, + {"update":{"timestamp":"1674081667226968153", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Local:Mgmt"}}]}}, + {"update":{"timestamp":"1674081667228936729", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"3"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel}"}}]}}, + {"update":{"timestamp":"1674081667236178737", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345}"}}]}}, + {"update":{"timestamp":"1674081667236377628", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890}"}}]}}, + {"update":{"timestamp":"1674081667238697959", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667241404442", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667243155079", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"2"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667250570407", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667251795605", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"16386"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"LOWER_LAYER_DOWN"}}]}}, + {"syncResponse":true} +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/tagging_name_based_old/telegraf.conf b/plugins/inputs/gnmi/testcases/tagging_name_based_old/telegraf.conf new file mode 100644 index 000000000..73f2df25c --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_name_based_old/telegraf.conf @@ -0,0 +1,16 @@ +[[inputs.gnmi]] + addresses = ["dummy"] + redial = "10s" + + [[inputs.gnmi.subscription]] + name = "descr" + origin = "openconfig" + path = "/interfaces/interface/subinterfaces/subinterface/state/description" + subscription_mode = "on_change" + tag_only = true + + [[inputs.gnmi.subscription]] + name = "interfaces_logical_status" + origin = "openconfig" + path = "/interfaces/interface/subinterfaces/subinterface/state/oper-status" + subscription_mode = "on_change" \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/tagging_subinterfaces/expected.out b/plugins/inputs/gnmi/testcases/tagging_subinterfaces/expected.out new file mode 100644 index 000000000..aa0a7a301 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_subinterfaces/expected.out @@ -0,0 +1,3 @@ +interfaces_logical_status,descr/description=Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345},index=1410,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250570407 +interfaces_logical_status,descr/description=Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890},index=1412,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250784367 +interfaces_logical_status,index=32767,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250994907 diff --git a/plugins/inputs/gnmi/testcases/tagging_subinterfaces/responses.json b/plugins/inputs/gnmi/testcases/tagging_subinterfaces/responses.json new file mode 100644 index 000000000..7b6b17cb6 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_subinterfaces/responses.json @@ -0,0 +1,9 @@ +[ + {"update":{"timestamp":"1674081667236178737", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345}"}}]}}, + {"update":{"timestamp":"1674081667236377628", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890}"}}]}}, + {"update":{"timestamp":"1674081667236582084", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"32767"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":""}}]}}, + {"update":{"timestamp":"1674081667250570407", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667250784367", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"update":{"timestamp":"1674081667250994907", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"32767"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}}, + {"syncResponse":true} +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/tagging_subinterfaces/telegraf.conf b/plugins/inputs/gnmi/testcases/tagging_subinterfaces/telegraf.conf new file mode 100644 index 000000000..89af35897 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/tagging_subinterfaces/telegraf.conf @@ -0,0 +1,16 @@ +[[inputs.gnmi]] + addresses = ["dummy"] + redial = "10s" + + [[inputs.gnmi.tag_subscription]] + name = "descr" + origin = "openconfig" + path = "/interfaces/interface/subinterfaces/subinterface/state/description" + subscription_mode = "on_change" + elements = ["interface", "subinterface"] + + [[inputs.gnmi.subscription]] + name = "interfaces_logical_status" + origin = "openconfig" + path = "/interfaces/interface/subinterfaces/subinterface/state/oper-status" + subscription_mode = "on_change" diff --git a/plugins/inputs/gnmi/utils.go b/plugins/inputs/gnmi/utils.go new file mode 100644 index 000000000..2041ea838 --- /dev/null +++ b/plugins/inputs/gnmi/utils.go @@ -0,0 +1,155 @@ +package gnmi + +import ( + "bytes" + "encoding/json" + "fmt" + "math" + "strings" + + gnmiLib "github.com/openconfig/gnmi/proto/gnmi" + + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" +) + +// Parse path to path-buffer and tag-field +func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) { + builder := bytes.NewBufferString(prefix) + + // Some devices do report the origin in the first path element + // so try to find out if this is the case. + if gnmiPath.Origin == "" && len(gnmiPath.Elem) > 0 { + groups := originPattern.FindStringSubmatch(gnmiPath.Elem[0].Name) + if len(groups) == 2 { + gnmiPath.Origin = groups[1] + gnmiPath.Elem[0].Name = gnmiPath.Elem[0].Name[len(groups[1])+1:] + } + } + + // Prefix with origin + if len(gnmiPath.Origin) > 0 { + if _, err := builder.WriteString(gnmiPath.Origin); err != nil { + return "", "", err + } + if _, err := builder.WriteRune(':'); err != nil { + return "", "", err + } + } + + // Parse generic keys from prefix + for _, elem := range gnmiPath.Elem { + if len(elem.Name) > 0 { + if _, err := builder.WriteRune('/'); err != nil { + return "", "", err + } + if _, err := builder.WriteString(elem.Name); err != nil { + return "", "", err + } + } + name := builder.String() + + if _, exists := aliases[name]; exists { + aliasPath = name + } + + if tags != nil { + for key, val := range elem.Key { + key = strings.ReplaceAll(key, "-", "_") + + // Use short-form of key if possible + if _, exists := tags[key]; exists { + tags[name+"/"+key] = val + } else { + tags[key] = val + } + } + } + } + + return builder.String(), aliasPath, nil +} + +// equalPathNoKeys checks if two gNMI paths are equal, without keys +func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool { + if len(a.Elem) != len(b.Elem) { + return false + } + for i := range a.Elem { + if a.Elem[i].Name != b.Elem[i].Name { + return false + } + } + return true +} + +func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem { + var newPath []*gnmiLib.PathElem + for _, elem := range gpath.Elem { + if elem.Key != nil { + newPath = append(newPath, elem) + } + } + return newPath +} + +func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path { + if prefix == nil { + return gpath + } + fullPath := new(gnmiLib.Path) + fullPath.Origin = prefix.Origin + fullPath.Target = prefix.Target + fullPath.Elem = append(prefix.Elem, gpath.Elem...) + return fullPath +} + +func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) { + var value interface{} + var jsondata []byte + + // Make sure a value is actually set + if updateVal == nil || updateVal.Value == nil { + return nil, nil + } + + switch val := updateVal.Value.(type) { + case *gnmiLib.TypedValue_AsciiVal: + value = val.AsciiVal + case *gnmiLib.TypedValue_BoolVal: + value = val.BoolVal + case *gnmiLib.TypedValue_BytesVal: + value = val.BytesVal + case *gnmiLib.TypedValue_DoubleVal: + value = val.DoubleVal + case *gnmiLib.TypedValue_DecimalVal: + //nolint:staticcheck // to maintain backward compatibility with older gnmi specs + value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision)) + case *gnmiLib.TypedValue_FloatVal: + //nolint:staticcheck // to maintain backward compatibility with older gnmi specs + value = val.FloatVal + case *gnmiLib.TypedValue_IntVal: + value = val.IntVal + case *gnmiLib.TypedValue_StringVal: + value = val.StringVal + case *gnmiLib.TypedValue_UintVal: + value = val.UintVal + case *gnmiLib.TypedValue_JsonIetfVal: + jsondata = val.JsonIetfVal + case *gnmiLib.TypedValue_JsonVal: + jsondata = val.JsonVal + } + + fields := make(map[string]interface{}) + if value != nil { + fields[name] = value + } else if jsondata != nil { + if err := json.Unmarshal(jsondata, &value); err != nil { + return nil, fmt.Errorf("failed to parse JSON value: %v", err) + } + flattener := jsonparser.JSONFlattener{Fields: fields} + if err := flattener.FullFlattenJSON(name, value, true, true); err != nil { + return nil, fmt.Errorf("failed to flatten JSON: %v", err) + } + } + return fields, nil +}