fix(inputs.gnmi): Handle both new-style `tag_subscription` and old-style `tag_only` (#12512)

This commit is contained in:
Sven Rebhan 2023-02-08 19:30:05 +01:00 committed by GitHub
parent 8af579752f
commit 58a01e1daf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1146 additions and 682 deletions

View File

@ -91,14 +91,29 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# [[inputs.gnmi.tag_subscription]]
# ## When applying this value as a tag to other metrics, use this tag name
# name = "descr"
#
# ## All other subscription fields are as normal
# origin = "openconfig-interfaces"
# path = "/interfaces/interface/state"
# subscription_mode = "on_change"
# ## At least one path element name must be supplied that contains at least
# ## one key to match on. Multiple element names can be specified in any
# ## order. In this case all element names must be present.
# elements = ["description", "interface"]
#
# ## Match strategy to use for the tag.
# ## Tags are only applied for metrics of the same address. The following
# ## settings are valid:
# ## unconditional -- always match
# ## name -- match by the "name" key
# ## This resembles the previsou 'tag-only' behavior.
# ## elements -- match by the keys in the path filtered by the path
# ## parts specified `elements` below
# ## By default, 'elements' is used if the 'elements' option is provided,
# ## otherwise match by 'name'.
# # match = ""
#
# ## For the 'elements' match strategy, at least one path-element name must
# ## be supplied containing at least one key to match on. Multiple path
# ## elements can be specified in any order. All given keys must be equal
# ## for a match.
# # elements = ["description", "interface"]
```
## Metrics

View File

@ -2,15 +2,10 @@
package gnmi
import (
"bytes"
"context"
"crypto/tls"
_ "embed"
"encoding/json"
"fmt"
"io"
"math"
"net"
"path"
"regexp"
"strings"
@ -19,17 +14,12 @@ import (
"github.com/google/gnxi/utils/xpath"
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
//go:embed sample.conf
@ -52,52 +42,25 @@ type GNMI struct {
Subscriptions []Subscription `toml:"subscription"`
TagSubscriptions []TagSubscription `toml:"tag_subscription"`
Aliases map[string]string `toml:"aliases"`
Encoding string `toml:"encoding"`
Origin string `toml:"origin"`
Prefix string `toml:"prefix"`
Target string `toml:"target"`
UpdatesOnly bool `toml:"updates_only"`
Username string `toml:"username"`
Password string `toml:"password"`
Redial config.Duration `toml:"redial"`
MaxMsgSize config.Size `toml:"max_msg_size"`
// Optional subscription configuration
Encoding string
Origin string
Prefix string
Target string
UpdatesOnly bool `toml:"updates_only"`
// gNMI target credentials
Username string
Password string
// Redial
Redial config.Duration
// GRPC TLS settings
EnableTLS bool `toml:"enable_tls"`
Trace bool `toml:"dump_responses"`
EnableTLS bool `toml:"enable_tls"`
Log telegraf.Logger `toml:"-"`
internaltls.ClientConfig
// Internal state
internalAliases map[string]string
acc telegraf.Accumulator
cancel context.CancelFunc
wg sync.WaitGroup
legacyTags bool
emptyNameWarnShown bool
Log telegraf.Logger
}
type Worker struct {
address string
tagStore *tagNode
}
type tagNode struct {
elem *gnmiLib.PathElem
tagName string
value *gnmiLib.TypedValue
tagStore map[string][]*tagNode
}
type tagResults struct {
names []string
values []*gnmiLib.TypedValue
internalAliases map[string]string
acc telegraf.Accumulator
cancel context.CancelFunc
wg sync.WaitGroup
}
// Subscription for a gNMI client
@ -123,6 +86,7 @@ type Subscription struct {
// Tag Subscription for a gNMI client
type TagSubscription struct {
Subscription
Match string `toml:"match"`
Elements []string
}
@ -141,13 +105,15 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
for i := len(c.Subscriptions) - 1; i >= 0; i-- {
subscription := c.Subscriptions[i]
// Support legacy TagOnly subscriptions
// Support and convert legacy TagOnly subscriptions
if subscription.TagOnly {
tagSub := convertTagOnlySubscription(subscription)
tagSub := TagSubscription{
Subscription: subscription,
Match: "name",
}
c.TagSubscriptions = append(c.TagSubscriptions, tagSub)
// Remove from the original subscriptions list
c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...)
c.legacyTags = true
continue
}
if err = subscription.buildFullPath(c); err != nil {
@ -161,8 +127,21 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly {
return fmt.Errorf("do not mix legacy tag_only subscriptions and tag subscriptions")
}
if len(c.TagSubscriptions[idx].Elements) == 0 {
return fmt.Errorf("tag_subscription must have at least one element")
switch c.TagSubscriptions[idx].Match {
case "":
if len(c.TagSubscriptions[idx].Elements) > 0 {
c.TagSubscriptions[idx].Match = "elements"
} else {
c.TagSubscriptions[idx].Match = "name"
}
case "unconditional":
case "name":
case "elements":
if len(c.TagSubscriptions[idx].Elements) == 0 {
return fmt.Errorf("tag_subscription must have at least one element")
}
default:
return fmt.Errorf("unknown match type %q for tag-subscription %q", c.TagSubscriptions[idx].Match, c.TagSubscriptions[idx].Name)
}
}
@ -206,12 +185,11 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
// Create a goroutine for each device, dial and subscribe
c.wg.Add(len(c.Addresses))
for _, addr := range c.Addresses {
worker := Worker{address: addr}
worker.tagStore = &tagNode{}
go func(worker Worker) {
go func(addr string) {
defer c.wg.Done()
h := newHandler(addr, c.internalAliases, c.TagSubscriptions, int(c.MaxMsgSize), c.Log, c.Trace)
for ctx.Err() == nil {
if err := c.subscribeGNMI(ctx, &worker, tlscfg, request); err != nil && ctx.Err() == nil {
if err := h.subscribeGNMI(ctx, acc, tlscfg, request); err != nil && ctx.Err() == nil {
acc.AddError(err)
}
@ -220,7 +198,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
case <-time.After(time.Duration(c.Redial)):
}
}
}(worker)
}(addr)
}
return nil
}
@ -291,239 +269,6 @@ func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) {
}, nil
}
// SubscribeGNMI and extract telemetry data
func (c *GNMI) subscribeGNMI(ctx context.Context, worker *Worker, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
var creds credentials.TransportCredentials
if tlscfg != nil {
creds = credentials.NewTLS(tlscfg)
} else {
creds = insecure.NewCredentials()
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
if c.MaxMsgSize > 0 {
opts = append(opts, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(int(c.MaxMsgSize)),
))
}
client, err := grpc.DialContext(ctx, worker.address, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %v", err)
}
defer client.Close()
subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx)
if err != nil {
return fmt.Errorf("failed to setup subscription: %v", err)
}
if err = subscribeClient.Send(request); err != nil {
// If io.EOF is returned, the stream may have ended and stream status
// can be determined by calling Recv.
if err != io.EOF {
return fmt.Errorf("failed to send subscription request: %v", err)
}
}
c.Log.Debugf("Connection to gNMI device %s established", worker.address)
defer c.Log.Debugf("Connection to gNMI device %s closed", worker.address)
for ctx.Err() == nil {
var reply *gnmiLib.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
if err != io.EOF && ctx.Err() == nil {
return fmt.Errorf("aborted gNMI subscription: %v", err)
}
break
}
c.handleSubscribeResponse(worker, reply)
}
return nil
}
func (c *GNMI) handleSubscribeResponse(worker *Worker, reply *gnmiLib.SubscribeResponse) {
if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok {
c.handleSubscribeResponseUpdate(worker, response)
}
}
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.SubscribeResponse_Update) {
var prefix, prefixAliasPath string
grouper := metric.NewSeriesGrouper()
timestamp := time.Unix(0, response.Update.Timestamp)
prefixTags := make(map[string]string)
if response.Update.Prefix != nil {
var err error
if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, c.internalAliases, ""); err != nil {
c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
}
}
prefixTags["source"], _, _ = net.SplitHostPort(worker.address)
prefixTags["path"] = prefix
// Process and remove tag-only updates from the response
for i := len(response.Update.Update) - 1; i >= 0; i-- {
update := response.Update.Update[i]
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
for _, tagSub := range c.TagSubscriptions {
if equalPathNoKeys(fullPath, tagSub.fullPath) {
c.Log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update)
worker.storeTags(update, tagSub)
response.Update.Update = append(response.Update.Update[:i], response.Update.Update[i+1:]...)
}
}
}
// Parse individual Update message and create measurements
var name, lastAliasPath string
for _, update := range response.Update.Update {
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
// Prepare tags from prefix
tags := make(map[string]string, len(prefixTags))
for key, val := range prefixTags {
tags[key] = val
}
aliasPath, fields := c.handleTelemetryField(update, tags, prefix)
if tagOnlyTags := worker.checkTags(fullPath); tagOnlyTags != nil {
for k, v := range tagOnlyTags {
if alias, ok := c.internalAliases[k]; ok {
tags[alias] = fmt.Sprint(v)
} else {
tags[k] = fmt.Sprint(v)
}
}
}
// Inherent valid alias from prefix parsing
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
aliasPath = prefixAliasPath
}
// Lookup alias if alias-path has changed
if aliasPath != lastAliasPath {
name = prefix
if alias, ok := c.internalAliases[aliasPath]; ok {
name = alias
} else {
c.Log.Debugf("No measurement alias for gNMI path: %s", name)
}
}
// Check for empty names
if name == "" && !c.emptyNameWarnShown {
c.Log.Warnf(emptyNameWarning, response.Update)
c.emptyNameWarnShown = true
}
// Group metrics
for k, v := range fields {
key := k
if len(aliasPath) < len(key) && len(aliasPath) != 0 {
// This may not be an exact prefix, due to naming style
// conversion on the key.
key = key[len(aliasPath)+1:]
} else if len(aliasPath) >= len(key) {
// Otherwise use the last path element as the field key.
key = path.Base(key)
// If there are no elements skip the item; this would be an
// invalid message.
key = strings.TrimLeft(key, "/.")
if key == "" {
c.Log.Errorf("invalid empty path: %q", k)
continue
}
}
grouper.Add(name, tags, timestamp, key, v)
}
lastAliasPath = aliasPath
}
// Add grouped measurements
for _, metricToAdd := range grouper.Metrics() {
c.acc.AddMetric(metricToAdd)
}
}
// HandleTelemetryField and add it to a measurement
func (c *GNMI) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
gpath, aliasPath, err := handlePath(update.Path, tags, c.internalAliases, prefix)
if err != nil {
c.Log.Errorf("handling path %q failed: %v", update.Path, err)
}
fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val)
if err != nil {
c.Log.Errorf("error parsing update value %q: %v", update.Val, err)
}
return aliasPath, fields
}
// Parse path to path-buffer and tag-field
func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) {
builder := bytes.NewBufferString(prefix)
// Some devices do report the origin in the first path element
// so try to find out if this is the case.
if gnmiPath.Origin == "" && len(gnmiPath.Elem) > 0 {
groups := originPattern.FindStringSubmatch(gnmiPath.Elem[0].Name)
if len(groups) == 2 {
gnmiPath.Origin = groups[1]
gnmiPath.Elem[0].Name = gnmiPath.Elem[0].Name[len(groups[1])+1:]
}
}
// Prefix with origin
if len(gnmiPath.Origin) > 0 {
if _, err := builder.WriteString(gnmiPath.Origin); err != nil {
return "", "", err
}
if _, err := builder.WriteRune(':'); err != nil {
return "", "", err
}
}
// Parse generic keys from prefix
for _, elem := range gnmiPath.Elem {
if len(elem.Name) > 0 {
if _, err := builder.WriteRune('/'); err != nil {
return "", "", err
}
if _, err := builder.WriteString(elem.Name); err != nil {
return "", "", err
}
}
name := builder.String()
if _, exists := aliases[name]; exists {
aliasPath = name
}
if tags != nil {
for key, val := range elem.Key {
key = strings.ReplaceAll(key, "-", "_")
// Use short-form of key if possible
if _, exists := tags[key]; exists {
tags[name+"/"+key] = val
} else {
tags[key] = val
}
}
}
}
return builder.String(), aliasPath, nil
}
// ParsePath from XPath-like string to gNMI path structure
func parsePath(origin string, pathToParse string, target string) (*gnmiLib.Path, error) {
gnmiPath, err := xpath.ToGNMIPath(pathToParse)
@ -559,45 +304,6 @@ func init() {
inputs.Add("cisco_telemetry_gnmi", New)
}
func convertTagOnlySubscription(s Subscription) TagSubscription {
t := TagSubscription{Subscription: s, Elements: []string{"interface"}}
return t
}
// equalPathNoKeys checks if two gNMI paths are equal, without keys
func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool {
if len(a.Elem) != len(b.Elem) {
return false
}
for i := range a.Elem {
if a.Elem[i].Name != b.Elem[i].Name {
return false
}
}
return true
}
func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem {
var newPath []*gnmiLib.PathElem
for _, elem := range gpath.Elem {
if elem.Key != nil {
newPath = append(newPath, elem)
}
}
return newPath
}
func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path {
if prefix == nil {
return gpath
}
fullPath := new(gnmiLib.Path)
fullPath.Origin = prefix.Origin
fullPath.Target = prefix.Target
fullPath.Elem = append(prefix.Elem, gpath.Elem...)
return fullPath
}
func (s *Subscription) buildFullPath(c *GNMI) error {
var err error
if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil {
@ -618,81 +324,6 @@ func (s *Subscription) buildFullPath(c *GNMI) error {
return nil
}
func (w *Worker) storeTags(update *gnmiLib.Update, sub TagSubscription) {
updateKeys := pathKeys(update.Path)
var foundKey bool
for _, requiredKey := range sub.Elements {
foundKey = false
for _, elem := range updateKeys {
if elem.Name == requiredKey {
foundKey = true
}
}
if !foundKey {
return
}
}
// All required keys present for this TagSubscription
w.tagStore.insert(updateKeys, sub.Name, update.Val)
}
func (node *tagNode) insert(keys []*gnmiLib.PathElem, name string, value *gnmiLib.TypedValue) {
if len(keys) == 0 {
node.value = value
node.tagName = name
return
}
var found *tagNode
key := keys[0]
keyName := key.Name
if node.tagStore == nil {
node.tagStore = make(map[string][]*tagNode)
}
if _, ok := node.tagStore[keyName]; !ok {
node.tagStore[keyName] = make([]*tagNode, 0)
}
for _, node := range node.tagStore[keyName] {
if compareKeys(node.elem.Key, key.Key) {
found = node
break
}
}
if found == nil {
found = &tagNode{elem: keys[0]}
node.tagStore[keyName] = append(node.tagStore[keyName], found)
}
found.insert(keys[1:], name, value)
}
func (node *tagNode) retrieve(keys []*gnmiLib.PathElem, tagResults *tagResults) {
if node.value != nil {
tagResults.names = append(tagResults.names, node.tagName)
tagResults.values = append(tagResults.values, node.value)
}
for _, key := range keys {
if elems, ok := node.tagStore[key.Name]; ok {
for _, node := range elems {
if compareKeys(node.elem.Key, key.Key) {
node.retrieve(keys, tagResults)
}
}
}
}
}
func (w *Worker) checkTags(fullPath *gnmiLib.Path) map[string]interface{} {
results := &tagResults{}
w.tagStore.retrieve(pathKeys(fullPath), results)
tags := make(map[string]interface{})
for idx := range results.names {
vals, _ := gnmiToFields(results.names[idx], results.values[idx])
for k, v := range vals {
tags[k] = v
}
}
return tags
}
func (s *Subscription) buildAlias(aliases map[string]string) error {
var err error
var gnmiLongPath, gnmiShortPath *gnmiLib.Path
@ -725,69 +356,3 @@ func (s *Subscription) buildAlias(aliases map[string]string) error {
}
return nil
}
func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) {
var value interface{}
var jsondata []byte
// Make sure a value is actually set
if updateVal == nil || updateVal.Value == nil {
return nil, nil
}
switch val := updateVal.Value.(type) {
case *gnmiLib.TypedValue_AsciiVal:
value = val.AsciiVal
case *gnmiLib.TypedValue_BoolVal:
value = val.BoolVal
case *gnmiLib.TypedValue_BytesVal:
value = val.BytesVal
case *gnmiLib.TypedValue_DoubleVal:
value = val.DoubleVal
case *gnmiLib.TypedValue_DecimalVal:
//nolint:staticcheck // to maintain backward compatibility with older gnmi specs
value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision))
case *gnmiLib.TypedValue_FloatVal:
//nolint:staticcheck // to maintain backward compatibility with older gnmi specs
value = val.FloatVal
case *gnmiLib.TypedValue_IntVal:
value = val.IntVal
case *gnmiLib.TypedValue_StringVal:
value = val.StringVal
case *gnmiLib.TypedValue_UintVal:
value = val.UintVal
case *gnmiLib.TypedValue_JsonIetfVal:
jsondata = val.JsonIetfVal
case *gnmiLib.TypedValue_JsonVal:
jsondata = val.JsonVal
}
fields := make(map[string]interface{})
if value != nil {
fields[name] = value
} else if jsondata != nil {
if err := json.Unmarshal(jsondata, &value); err != nil {
return nil, fmt.Errorf("failed to parse JSON value: %v", err)
}
flattener := jsonparser.JSONFlattener{Fields: fields}
if err := flattener.FullFlattenJSON(name, value, true, true); err != nil {
return nil, fmt.Errorf("failed to flatten JSON: %v", err)
}
}
return fields, nil
}
func compareKeys(a map[string]string, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if _, ok := b[k]; !ok {
return false
}
if b[k] != v {
return false
}
}
return true
}

View File

@ -2,9 +2,12 @@ package gnmi
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"sync"
"testing"
"time"
@ -13,9 +16,12 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/encoding/protojson"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -96,8 +102,9 @@ func TestWaitError(t *testing.T) {
grpcServer.Stop()
wg.Wait()
require.Contains(t, acc.Errors,
errors.New("aborted gNMI subscription: rpc error: code = Unknown desc = testerror"))
// Check if the expected error text is among the errors
require.Len(t, acc.Errors, 1)
require.ErrorContains(t, acc.Errors[0], "aborted gNMI subscription: rpc error: code = Unknown desc = testerror")
}
func TestUsernamePassword(t *testing.T) {
@ -154,8 +161,9 @@ func TestUsernamePassword(t *testing.T) {
grpcServer.Stop()
wg.Wait()
require.Contains(t, acc.Errors,
errors.New("aborted gNMI subscription: rpc error: code = Unknown desc = success"))
// Check if the expected error text is among the errors
require.Len(t, acc.Errors, 1)
require.ErrorContains(t, acc.Errors[0], "aborted gNMI subscription: rpc error: code = Unknown desc = success")
}
func mockGNMINotification() *gnmiLib.Notification {
@ -478,10 +486,9 @@ func TestNotification(t *testing.T) {
testutil.MustMetric(
"oc-intf-counters",
map[string]string{
"path": "",
"source": "127.0.0.1",
"name": "Ethernet1",
"oc-intf-desc": "foo",
"source": "127.0.0.1",
"name": "Ethernet1",
"oc-intf-desc/description": "foo",
},
map[string]interface{}{
"in_broadcast_pkts": 42,
@ -626,6 +633,7 @@ func TestNotification(t *testing.T) {
},
},
}
return server.Send(taggedResponse)
},
},
@ -633,11 +641,10 @@ func TestNotification(t *testing.T) {
testutil.MustMetric(
"oc-neigh-state",
map[string]string{
"path": "",
"source": "127.0.0.1",
"neighbor_address": "192.0.2.1",
"name": "default",
"oc-neigh-desc": "EXAMPLE-PEER",
"source": "127.0.0.1",
"neighbor_address": "192.0.2.1",
"name": "default",
"oc-neigh-desc/description": "EXAMPLE-PEER",
"/network-instances/network-instance/protocols/protocol/name": "BGP",
"identifier": "BGP",
},
@ -1011,193 +1018,128 @@ func TestRedial(t *testing.T) {
wg.Wait()
}
func TestTagNode(t *testing.T) {
type insertOp struct {
keys []*gnmiLib.PathElem
name string
value *gnmiLib.TypedValue
}
interfaceElemSingleKey := &gnmiLib.PathElem{
Name: "interface",
Key: map[string]string{"name": "Management0"},
}
networkInstanceSingleKey := &gnmiLib.PathElem{
Name: "network-instance",
Key: map[string]string{"name": "default"},
}
protocolDoubleKey := &gnmiLib.PathElem{
Name: "protocol",
Key: map[string]string{"name": "BGP", "protocol": "BGP"},
}
neighborSingleKey := &gnmiLib.PathElem{
Name: "neighbor",
Key: map[string]string{"neighbor_address": "192.0.2.1"},
}
tests := []struct {
name string
insertOps []insertOp
expected *tagNode
}{
{
name: "single elem single key insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{interfaceElemSingleKey},
name: "tagFoo",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"interface": {
{
elem: interfaceElemSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_IntVal{IntVal: 1}},
tagName: "tagFoo",
},
},
},
},
},
{
name: "double elem single key insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{interfaceElemSingleKey, networkInstanceSingleKey},
name: "tagBar",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"interface": {
{
elem: interfaceElemSingleKey,
tagStore: map[string][]*tagNode{
"network-instance": {
{
elem: networkInstanceSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "rocks"}},
tagName: "tagBar",
},
},
},
},
},
},
},
},
{
name: "single elem double key insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{protocolDoubleKey},
name: "doubleKey",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"protocol": {
{
elem: protocolDoubleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_JsonVal{JsonVal: []byte("{}")}},
tagName: "doubleKey",
},
},
},
},
},
{
name: "multi elem unrelated insert",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{interfaceElemSingleKey},
name: "intf_desc",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}},
},
{
keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey, neighborSingleKey},
name: "bgp_neigh_desc",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"interface": {
{
elem: interfaceElemSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "mgmt"}},
tagName: "intf_desc",
},
},
"network-instance": {
{
elem: networkInstanceSingleKey,
tagStore: map[string][]*tagNode{
"protocol": {
{
elem: protocolDoubleKey,
tagStore: map[string][]*tagNode{
"neighbor": {
{
elem: neighborSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "example-neighbor"}},
tagName: "bgp_neigh_desc",
},
},
},
},
},
},
},
},
},
},
},
{
name: "values at multiple levels",
insertOps: []insertOp{
{
keys: []*gnmiLib.PathElem{networkInstanceSingleKey},
name: "vrf_stuff",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}},
},
{
keys: []*gnmiLib.PathElem{networkInstanceSingleKey, protocolDoubleKey},
name: "protocol_stuff",
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}},
},
},
expected: &tagNode{
tagStore: map[string][]*tagNode{
"network-instance": {
{
elem: networkInstanceSingleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "foo"}},
tagName: "vrf_stuff",
tagStore: map[string][]*tagNode{
"protocol": {
{
elem: protocolDoubleKey,
value: &gnmiLib.TypedValue{Value: &gnmiLib.TypedValue_StringVal{StringVal: "bar"}},
tagName: "protocol_stuff",
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rootNode := new(tagNode)
for _, s := range tt.insertOps {
rootNode.insert(s.keys, s.name, s.value)
func TestCases(t *testing.T) {
// Get all testcase directories
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
// Register the plugin
inputs.Add("gnmi", New)
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
configFilename := filepath.Join(testcasePath, "telegraf.conf")
inputFilename := filepath.Join(testcasePath, "responses.json")
expectedFilename := filepath.Join(testcasePath, "expected.out")
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
// Load the input data
buf, err := os.ReadFile(inputFilename)
require.NoError(t, err)
var entries []json.RawMessage
require.NoError(t, json.Unmarshal(buf, &entries))
responses := make([]gnmiLib.SubscribeResponse, len(entries))
for i, entry := range entries {
require.NoError(t, protojson.Unmarshal(entry, &responses[i]))
}
require.Equal(t, rootNode, tt.expected)
// Prepare the influx parser for expectations
parser := &influx.Parser{}
require.NoError(t, parser.Init())
// Read the expected output if any
var expected []telegraf.Metric
if _, err := os.Stat(expectedFilename); err == nil {
var err error
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
require.NoError(t, err)
}
// Read the expected output if any
var expectedErrors []string
if _, err := os.Stat(expectedErrorFilename); err == nil {
var err error
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
require.NoError(t, err)
require.NotEmpty(t, expectedErrors)
}
// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(configFilename))
require.Len(t, cfg.Inputs, 1)
// Prepare the server response
responseFunction := func(server gnmiLib.GNMI_SubscribeServer) error {
sync := &gnmiLib.SubscribeResponse{
Response: &gnmiLib.SubscribeResponse_SyncResponse{
SyncResponse: true,
},
}
_ = sync
for i := range responses {
if err := server.Send(&responses[i]); err != nil {
return err
}
}
return nil
}
// Setup a mock server
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
grpcServer := grpc.NewServer()
gnmiServer := &MockServer{
SubscribeF: responseFunction,
GRPCServer: grpcServer,
}
gnmiLib.RegisterGNMIServer(grpcServer, gnmiServer)
// Setup the plugin
plugin := cfg.Inputs[0].Input.(*GNMI)
plugin.Addresses = []string{listener.Addr().String()}
plugin.Log = testutil.Logger{}
// Start the server
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := grpcServer.Serve(listener)
require.NoError(t, err)
}()
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
require.Eventually(t,
func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 1*time.Second, 100*time.Millisecond)
plugin.Stop()
grpcServer.Stop()
wg.Wait()
// Check for errors
require.Len(t, acc.Errors, len(expectedErrors))
if len(acc.Errors) > 0 {
var actualErrorMsgs []string
for _, err := range acc.Errors {
actualErrorMsgs = append(actualErrorMsgs, err.Error())
}
require.ElementsMatch(t, actualErrorMsgs, expectedErrors)
}
// Check the metric nevertheless as we might get some metrics despite errors.
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
})
}
}

View File

@ -0,0 +1,238 @@
package gnmi
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"path"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
)
type handler struct {
address string
aliases map[string]string
tagsubs []TagSubscription
maxMsgSize int
emptyNameWarnShown bool
tagStore *tagStore
trace bool
log telegraf.Logger
}
func newHandler(addr string, aliases map[string]string, subs []TagSubscription, maxsize int, l telegraf.Logger, trace bool) *handler {
return &handler{
address: addr,
aliases: aliases,
tagsubs: subs,
maxMsgSize: maxsize,
tagStore: newTagStore(subs),
trace: trace,
log: l,
}
}
// SubscribeGNMI and extract telemetry data
func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
var creds credentials.TransportCredentials
if tlscfg != nil {
creds = credentials.NewTLS(tlscfg)
} else {
creds = insecure.NewCredentials()
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
if h.maxMsgSize > 0 {
opts = append(opts, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(h.maxMsgSize),
))
}
client, err := grpc.DialContext(ctx, h.address, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %v", err)
}
defer client.Close()
subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx)
if err != nil {
return fmt.Errorf("failed to setup subscription: %w", err)
}
// If io.EOF is returned, the stream may have ended and stream status
// can be determined by calling Recv.
if err := subscribeClient.Send(request); err != nil && err != io.EOF {
return fmt.Errorf("failed to send subscription request: %w", err)
}
h.log.Debugf("Connection to gNMI device %s established", h.address)
defer h.log.Debugf("Connection to gNMI device %s closed", h.address)
for ctx.Err() == nil {
var reply *gnmiLib.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
if err != io.EOF && ctx.Err() == nil {
return fmt.Errorf("aborted gNMI subscription: %w", err)
}
break
}
if h.trace {
buf, err := protojson.Marshal(reply)
if err != nil {
h.log.Debugf("marshal failed: %v", err)
} else {
t := reply.GetUpdate().GetTimestamp()
h.log.Debugf("update_%v: %s", t, string(buf))
}
}
if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok {
h.handleSubscribeResponseUpdate(acc, response)
}
}
return nil
}
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, response *gnmiLib.SubscribeResponse_Update) {
var prefix, prefixAliasPath string
grouper := metric.NewSeriesGrouper()
timestamp := time.Unix(0, response.Update.Timestamp)
prefixTags := make(map[string]string)
if response.Update.Prefix != nil {
var err error
if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, h.aliases, ""); err != nil {
h.log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
}
}
prefixTags["source"], _, _ = net.SplitHostPort(h.address)
if prefix != "" {
prefixTags["path"] = prefix
}
// Process and remove tag-updates from the response first so we will
// add all available tags to the metrics later.
var valueUpdates []*gnmiLib.Update
for _, update := range response.Update.Update {
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
// Prepare tags from prefix
tags := make(map[string]string, len(prefixTags))
for key, val := range prefixTags {
tags[key] = val
}
_, fields := h.handleTelemetryField(update, tags, prefix)
var tagUpdate bool
for _, tagSub := range h.tagsubs {
if !equalPathNoKeys(fullPath, tagSub.fullPath) {
continue
}
h.log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update)
if err := h.tagStore.insert(tagSub, fullPath, fields, tags); err != nil {
h.log.Errorf("inserting tag failed: %w", err)
}
tagUpdate = true
break
}
if !tagUpdate {
valueUpdates = append(valueUpdates, update)
}
}
// Parse individual Update message and create measurements
var name, lastAliasPath string
for _, update := range valueUpdates {
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
// Prepare tags from prefix
tags := make(map[string]string, len(prefixTags))
for key, val := range prefixTags {
tags[key] = val
}
aliasPath, fields := h.handleTelemetryField(update, tags, prefix)
// Add the tags derived via tag-subscriptions
for k, v := range h.tagStore.lookup(fullPath, tags) {
tags[k] = v
}
// Inherent valid alias from prefix parsing
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
aliasPath = prefixAliasPath
}
// Lookup alias if alias-path has changed
if aliasPath != lastAliasPath {
name = prefix
if alias, ok := h.aliases[aliasPath]; ok {
name = alias
} else {
h.log.Debugf("No measurement alias for gNMI path: %s", name)
}
lastAliasPath = aliasPath
}
// Check for empty names
if name == "" && !h.emptyNameWarnShown {
h.log.Warnf(emptyNameWarning, response.Update)
h.emptyNameWarnShown = true
}
// Group metrics
for k, v := range fields {
key := k
if len(aliasPath) < len(key) && len(aliasPath) != 0 {
// This may not be an exact prefix, due to naming style
// conversion on the key.
key = key[len(aliasPath)+1:]
} else if len(aliasPath) >= len(key) {
// Otherwise use the last path element as the field key.
key = path.Base(key)
// If there are no elements skip the item; this would be an
// invalid message.
key = strings.TrimLeft(key, "/.")
if key == "" {
h.log.Errorf("invalid empty path: %q", k)
continue
}
}
grouper.Add(name, tags, timestamp, key, v)
}
}
// Add grouped measurements
for _, metricToAdd := range grouper.Metrics() {
acc.AddMetric(metricToAdd)
}
}
// HandleTelemetryField and add it to a measurement
func (h *handler) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
gpath, aliasPath, err := handlePath(update.Path, tags, h.aliases, prefix)
if err != nil {
h.log.Errorf("handling path %q failed: %v", update.Path, err)
}
fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val)
if err != nil {
h.log.Errorf("error parsing update value %q: %v", update.Val, err)
}
return aliasPath, fields
}

View File

@ -66,11 +66,26 @@
# [[inputs.gnmi.tag_subscription]]
# ## When applying this value as a tag to other metrics, use this tag name
# name = "descr"
#
# ## All other subscription fields are as normal
# origin = "openconfig-interfaces"
# path = "/interfaces/interface/state"
# subscription_mode = "on_change"
# ## At least one path element name must be supplied that contains at least
# ## one key to match on. Multiple element names can be specified in any
# ## order. In this case all element names must be present.
# elements = ["description", "interface"]
#
# ## Match strategy to use for the tag.
# ## Tags are only applied for metrics of the same address. The following
# ## settings are valid:
# ## unconditional -- always match
# ## name -- match by the "name" key
# ## This resembles the previsou 'tag-only' behavior.
# ## elements -- match by the keys in the path filtered by the path
# ## parts specified `elements` below
# ## By default, 'elements' is used if the 'elements' option is provided,
# ## otherwise match by 'name'.
# # match = ""
#
# ## For the 'elements' match strategy, at least one path-element name must
# ## be supplied containing at least one key to match on. Multiple path
# ## elements can be specified in any order. All given keys must be equal
# ## for a match.
# # elements = ["description", "interface"]

View File

@ -0,0 +1,177 @@
package gnmi
import (
"fmt"
"path/filepath"
"sort"
"strings"
"github.com/influxdata/telegraf/internal"
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
)
type tagStore struct {
unconditional map[string]string
names map[string]map[string]string
elements elementsStore
}
type elementsStore struct {
required [][]string
tags map[string]map[string]string
}
func newTagStore(subs []TagSubscription) *tagStore {
store := tagStore{
unconditional: make(map[string]string),
names: make(map[string]map[string]string),
elements: elementsStore{
required: make([][]string, 0, len(subs)),
tags: make(map[string]map[string]string),
},
}
for _, s := range subs {
if s.Match == "elements" {
store.elements.required = append(store.elements.required, s.Elements)
}
}
return &store
}
// Store tags extracted from TagSubscriptions
func (s *tagStore) insert(subscription TagSubscription, path *gnmiLib.Path, values map[string]interface{}, tags map[string]string) error {
switch subscription.Match {
case "unconditional":
for k, v := range values {
tagName := subscription.Name + "/" + filepath.Base(k)
sv, err := internal.ToString(v)
if err != nil {
return fmt.Errorf("conversion error for %v: %w", v, err)
}
if sv == "" {
delete(s.unconditional, tagName)
} else {
s.unconditional[tagName] = sv
}
}
case "name":
// Get the lookup key
key, found := tags["name"]
if !found {
return nil
}
// Make sure we have a valid map for the key
if _, exists := s.names[key]; !exists {
s.names[key] = make(map[string]string)
}
// Add the values
for k, v := range values {
tagName := subscription.Name + "/" + filepath.Base(k)
sv, err := internal.ToString(v)
if err != nil {
return fmt.Errorf("conversion error for %v: %w", v, err)
}
if sv == "" {
delete(s.names[key], tagName)
} else {
s.names[key][tagName] = sv
}
}
case "elements":
key, match := s.getElementsKeys(path, subscription.Elements)
if !match || len(values) == 0 {
return nil
}
// Make sure we have a valid map for the key
if _, exists := s.elements.tags[key]; !exists {
s.elements.tags[key] = make(map[string]string)
}
// Add the values
for k, v := range values {
tagName := subscription.Name + "/" + filepath.Base(k)
sv, err := internal.ToString(v)
if err != nil {
return fmt.Errorf("conversion error for %v: %w", v, err)
}
if sv == "" {
delete(s.elements.tags[key], tagName)
} else {
s.elements.tags[key][tagName] = sv
}
}
default:
return fmt.Errorf("unknown match strategy %q", subscription.Match)
}
return nil
}
func (s *tagStore) lookup(path *gnmiLib.Path, metricTags map[string]string) map[string]string {
// Add all unconditional tags
tags := make(map[string]string, len(s.unconditional))
for k, v := range s.unconditional {
tags[k] = v
}
// Match names
key, found := metricTags["name"]
if found {
for k, v := range s.names[key] {
tags[k] = v
}
}
// Match elements
for _, requiredKeys := range s.elements.required {
key, match := s.getElementsKeys(path, requiredKeys)
if !match {
continue
}
for k, v := range s.elements.tags[key] {
tags[k] = v
}
}
return tags
}
func (s *tagStore) getElementsKeys(path *gnmiLib.Path, elements []string) (string, bool) {
keyElements := pathKeys(path)
// Search for the required path elements and collect a ordered
// list of their values to in the form
// elementName1={keyA=valueA,keyB=valueB,...},...,elementNameN={keyY=valueY,keyZ=valueZ}
// where each elements' key-value list is enclosed in curly brackets.
keyParts := make([]string, 0, len(elements))
for _, requiredElement := range elements {
var found bool
var elementKVs []string
for _, el := range keyElements {
if el.Name == requiredElement {
for k, v := range el.Key {
elementKVs = append(elementKVs, k+"="+v)
}
found = true
break
}
}
// The element was not found, but all must match
if !found {
return "", false
}
// We need to order the element's key-value pairs as the map
// returns elements in random order
sort.Strings(elementKVs)
// Collect the element
keyParts = append(keyParts, requiredElement+"={"+strings.Join(elementKVs, ",")+"}")
}
return strings.Join(keyParts, ","), true
}

View File

@ -0,0 +1 @@
oc-neigh-state,source=127.0.0.1,neighbor_address=192.0.2.1,name=default,oc-neigh-desc/description=EXAMPLE-PEER,/network-instances/network-instance/protocols/protocol/name=BGP,identifier=BGP session_state="ESTABLISHED" 1543236572000000000

View File

@ -0,0 +1,116 @@
[
{
"update": {
"timestamp": "1543236571000000000",
"prefix": {},
"update": [
{
"path": {
"elem": [
{
"name": "network-instances"
},
{
"name": "network-instance",
"key": {
"name": "default"
}
},
{
"name": "protocols"
},
{
"name": "protocol",
"key": {
"identifier": "BGP",
"name": "BGP"
}
},
{
"name": "bgp"
},
{
"name": "neighbors"
},
{
"name": "neighbor",
"key": {
"neighbor_address": "192.0.2.1"
}
},
{
"name": "state"
},
{
"name": "description"
}
]
},
"val": {
"stringVal": "EXAMPLE-PEER"
}
}
]
}
},
{
"syncResponse": true
},
{
"update": {
"timestamp": "1543236572000000000",
"prefix": {},
"update": [
{
"path": {
"elem": [
{
"name": "network-instances"
},
{
"name": "network-instance",
"key": {
"name": "default"
}
},
{
"name": "protocols"
},
{
"name": "protocol",
"key": {
"identifier": "BGP",
"name": "BGP"
}
},
{
"name": "bgp"
},
{
"name": "neighbors"
},
{
"name": "neighbor",
"key": {
"neighbor_address": "192.0.2.1"
}
},
{
"name": "state"
},
{
"name": "session-state"
}
]
},
"val": {
"stringVal": "ESTABLISHED"
}
}
]
}
},
{
"syncResponse": true
}
]

View File

@ -0,0 +1,14 @@
[[inputs.gnmi]]
addresses = ["127.0.0.1"]
[[inputs.gnmi.subscription]]
name = "oc-neigh-state"
origin = "openconfig"
path = "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/session-state"
subscription_mode = "sample"
sample_interval = "10s"
[[inputs.gnmi.tag_subscription]]
name = "oc-neigh-desc"
origin = "openconfig"
path = "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/description"
subscription_mode = "on_change"
elements = ["network-instance", "protocol", "neighbor"]

View File

@ -0,0 +1 @@
ifcounters,path=openconfig-interfaces:/interfaces/interface/state,name=eth42,descr/description=eth42,source=127.0.0.1 counters=5678i 1673608605875353770

View File

@ -0,0 +1,49 @@
[
{
"update": {
"timestamp": "1673608605875353770",
"prefix": {
"origin": "openconfig-interfaces",
"elem": [
{
"name": "interfaces"
},
{
"name": "interface",
"key":{"name":"eth42"}
},
{
"name": "state"
}
],
"target": "subscription"
},
"update": [
{
"path": {
"elem": [
{
"name": "counters"
}
]
},
"val": {
"intVal": "5678"
}
},
{
"path": {
"elem": [
{
"name": "description"
}
]
},
"val": {
"stringVal": "eth42"
}
}
]
}
}
]

View File

@ -0,0 +1,15 @@
[[inputs.gnmi]]
addresses = ["dummy"]
name_override = "gnmi"
redial = "10s"
[[inputs.gnmi.subscription]]
name = "ifcounters"
origin = "openconfig-interfaces"
path = "/interfaces/interface/state/counters"
subscription_mode = "sample"
sample_interval = "10s"
[[inputs.gnmi.tag_subscription]]
name = "descr"
origin = "openconfig-interfaces"
path = "/interfaces/interface/state/description"
subscription_mode = "on_change"

View File

@ -0,0 +1 @@
ifcounters,path=openconfig-interfaces:/interfaces/interface/state,name=eth42,descr/description=eth42,source=127.0.0.1 counters=5678i 1673608605875353770

View File

@ -0,0 +1,49 @@
[
{
"update": {
"timestamp": "1673608605875353770",
"prefix": {
"origin": "openconfig-interfaces",
"elem": [
{
"name": "interfaces"
},
{
"name": "interface",
"key":{"name":"eth42"}
},
{
"name": "state"
}
],
"target": "subscription"
},
"update": [
{
"path": {
"elem": [
{
"name": "counters"
}
]
},
"val": {
"intVal": "5678"
}
},
{
"path": {
"elem": [
{
"name": "description"
}
]
},
"val": {
"stringVal": "eth42"
}
}
]
}
}
]

View File

@ -0,0 +1,16 @@
[[inputs.gnmi]]
addresses = ["rt-01:nnnnn", "rt-02:nnnnn"]
name_override = "gnmi"
redial = "10s"
[[inputs.gnmi.subscription]]
name = "ifcounters"
origin = "openconfig-interfaces"
path = "/interfaces/interface/state/counters"
subscription_mode = "sample"
sample_interval = "10s"
[[inputs.gnmi.subscription]]
name = "descr"
origin = "openconfig-interfaces"
path = "/interfaces/interface/state/description"
subscription_mode = "on_change"
tag_only = true

View File

@ -0,0 +1,5 @@
interfaces_logical_status,descr/description=Loopback,index=0,name=lo0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667238697959
interfaces_logical_status,descr/description=Local:Mgmt,index=312,name=irb,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667241404442
interfaces_logical_status,descr/description=Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel},index=2,name=gr-0/0/0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667243155079
interfaces_logical_status,descr/description=Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345},index=1410,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250570407
interfaces_logical_status,descr/description=Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890},index=16386,name=xe-0/1/5,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="LOWER_LAYER_DOWN" 1674081667251795605

View File

@ -0,0 +1,13 @@
[
{"update":{"timestamp":"1674081667224189253", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Loopback"}}]}},
{"update":{"timestamp":"1674081667226968153", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Local:Mgmt"}}]}},
{"update":{"timestamp":"1674081667228936729", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"3"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel}"}}]}},
{"update":{"timestamp":"1674081667236178737", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345}"}}]}},
{"update":{"timestamp":"1674081667236377628", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890}"}}]}},
{"update":{"timestamp":"1674081667238697959", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667241404442", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667243155079", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"2"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667250570407", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667251795605", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"16386"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"LOWER_LAYER_DOWN"}}]}},
{"syncResponse":true}
]

View File

@ -0,0 +1,15 @@
[[inputs.gnmi]]
addresses = ["dummy"]
redial = "10s"
[[inputs.gnmi.tag_subscription]]
name = "descr"
origin = "openconfig"
path = "/interfaces/interface/subinterfaces/subinterface/state/description"
subscription_mode = "on_change"
[[inputs.gnmi.subscription]]
name = "interfaces_logical_status"
origin = "openconfig"
path = "/interfaces/interface/subinterfaces/subinterface/state/oper-status"
subscription_mode = "on_change"

View File

@ -0,0 +1,5 @@
interfaces_logical_status,descr/description=Loopback,index=0,name=lo0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667238697959
interfaces_logical_status,descr/description=Local:Mgmt,index=312,name=irb,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667241404442
interfaces_logical_status,descr/description=Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel},index=2,name=gr-0/0/0,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667243155079
interfaces_logical_status,descr/description=Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345},index=1410,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250570407
interfaces_logical_status,descr/description=Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890},index=16386,name=xe-0/1/5,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="LOWER_LAYER_DOWN" 1674081667251795605

View File

@ -0,0 +1,13 @@
[
{"update":{"timestamp":"1674081667224189253", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Loopback"}}]}},
{"update":{"timestamp":"1674081667226968153", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Local:Mgmt"}}]}},
{"update":{"timestamp":"1674081667228936729", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"3"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:GRE:abc-def-dmn1-staging:{GRE_Tunnel}"}}]}},
{"update":{"timestamp":"1674081667236178737", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345}"}}]}},
{"update":{"timestamp":"1674081667236377628", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890}"}}]}},
{"update":{"timestamp":"1674081667238697959", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"lo0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"0"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667241404442", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"irb"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"312"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667243155079", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"gr-0/0/0"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"2"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667250570407", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667251795605", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/5"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"16386"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"LOWER_LAYER_DOWN"}}]}},
{"syncResponse":true}
]

View File

@ -0,0 +1,16 @@
[[inputs.gnmi]]
addresses = ["dummy"]
redial = "10s"
[[inputs.gnmi.subscription]]
name = "descr"
origin = "openconfig"
path = "/interfaces/interface/subinterfaces/subinterface/state/description"
subscription_mode = "on_change"
tag_only = true
[[inputs.gnmi.subscription]]
name = "interfaces_logical_status"
origin = "openconfig"
path = "/interfaces/interface/subinterfaces/subinterface/state/oper-status"
subscription_mode = "on_change"

View File

@ -0,0 +1,3 @@
interfaces_logical_status,descr/description=Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345},index=1410,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250570407
interfaces_logical_status,descr/description=Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890},index=1412,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250784367
interfaces_logical_status,index=32767,name=xe-0/1/1,path=/interfaces/interface/subinterfaces/subinterface,source=127.0.0.1 oper_status="UP" 1674081667250994907

View File

@ -0,0 +1,9 @@
[
{"update":{"timestamp":"1674081667236178737", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:abc-def-dmn1-staging:{PF-BC-DAL-SFO-12345}"}}]}},
{"update":{"timestamp":"1674081667236377628", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":"Core:PacketFabric:uvw-xyz-dmn1-staging:{PF-BC-CHI-SFO-67890}"}}]}},
{"update":{"timestamp":"1674081667236582084", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"32767"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"description"}]}, "val":{"stringVal":""}}]}},
{"update":{"timestamp":"1674081667250570407", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1410"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667250784367", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"1412"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"update":{"timestamp":"1674081667250994907", "prefix":{"elem":[{"name":"interfaces"}, {"name":"interface", "key":{"name":"xe-0/1/1"}}, {"name":"subinterfaces"}, {"name":"subinterface", "key":{"index":"32767"}}]}, "update":[{"path":{"elem":[{"name":"state"}, {"name":"oper-status"}]}, "val":{"stringVal":"UP"}}]}},
{"syncResponse":true}
]

View File

@ -0,0 +1,16 @@
[[inputs.gnmi]]
addresses = ["dummy"]
redial = "10s"
[[inputs.gnmi.tag_subscription]]
name = "descr"
origin = "openconfig"
path = "/interfaces/interface/subinterfaces/subinterface/state/description"
subscription_mode = "on_change"
elements = ["interface", "subinterface"]
[[inputs.gnmi.subscription]]
name = "interfaces_logical_status"
origin = "openconfig"
path = "/interfaces/interface/subinterfaces/subinterface/state/oper-status"
subscription_mode = "on_change"

View File

@ -0,0 +1,155 @@
package gnmi
import (
"bytes"
"encoding/json"
"fmt"
"math"
"strings"
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
// Parse path to path-buffer and tag-field
func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) {
builder := bytes.NewBufferString(prefix)
// Some devices do report the origin in the first path element
// so try to find out if this is the case.
if gnmiPath.Origin == "" && len(gnmiPath.Elem) > 0 {
groups := originPattern.FindStringSubmatch(gnmiPath.Elem[0].Name)
if len(groups) == 2 {
gnmiPath.Origin = groups[1]
gnmiPath.Elem[0].Name = gnmiPath.Elem[0].Name[len(groups[1])+1:]
}
}
// Prefix with origin
if len(gnmiPath.Origin) > 0 {
if _, err := builder.WriteString(gnmiPath.Origin); err != nil {
return "", "", err
}
if _, err := builder.WriteRune(':'); err != nil {
return "", "", err
}
}
// Parse generic keys from prefix
for _, elem := range gnmiPath.Elem {
if len(elem.Name) > 0 {
if _, err := builder.WriteRune('/'); err != nil {
return "", "", err
}
if _, err := builder.WriteString(elem.Name); err != nil {
return "", "", err
}
}
name := builder.String()
if _, exists := aliases[name]; exists {
aliasPath = name
}
if tags != nil {
for key, val := range elem.Key {
key = strings.ReplaceAll(key, "-", "_")
// Use short-form of key if possible
if _, exists := tags[key]; exists {
tags[name+"/"+key] = val
} else {
tags[key] = val
}
}
}
}
return builder.String(), aliasPath, nil
}
// equalPathNoKeys checks if two gNMI paths are equal, without keys
func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool {
if len(a.Elem) != len(b.Elem) {
return false
}
for i := range a.Elem {
if a.Elem[i].Name != b.Elem[i].Name {
return false
}
}
return true
}
func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem {
var newPath []*gnmiLib.PathElem
for _, elem := range gpath.Elem {
if elem.Key != nil {
newPath = append(newPath, elem)
}
}
return newPath
}
func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path {
if prefix == nil {
return gpath
}
fullPath := new(gnmiLib.Path)
fullPath.Origin = prefix.Origin
fullPath.Target = prefix.Target
fullPath.Elem = append(prefix.Elem, gpath.Elem...)
return fullPath
}
func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) {
var value interface{}
var jsondata []byte
// Make sure a value is actually set
if updateVal == nil || updateVal.Value == nil {
return nil, nil
}
switch val := updateVal.Value.(type) {
case *gnmiLib.TypedValue_AsciiVal:
value = val.AsciiVal
case *gnmiLib.TypedValue_BoolVal:
value = val.BoolVal
case *gnmiLib.TypedValue_BytesVal:
value = val.BytesVal
case *gnmiLib.TypedValue_DoubleVal:
value = val.DoubleVal
case *gnmiLib.TypedValue_DecimalVal:
//nolint:staticcheck // to maintain backward compatibility with older gnmi specs
value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision))
case *gnmiLib.TypedValue_FloatVal:
//nolint:staticcheck // to maintain backward compatibility with older gnmi specs
value = val.FloatVal
case *gnmiLib.TypedValue_IntVal:
value = val.IntVal
case *gnmiLib.TypedValue_StringVal:
value = val.StringVal
case *gnmiLib.TypedValue_UintVal:
value = val.UintVal
case *gnmiLib.TypedValue_JsonIetfVal:
jsondata = val.JsonIetfVal
case *gnmiLib.TypedValue_JsonVal:
jsondata = val.JsonVal
}
fields := make(map[string]interface{})
if value != nil {
fields[name] = value
} else if jsondata != nil {
if err := json.Unmarshal(jsondata, &value); err != nil {
return nil, fmt.Errorf("failed to parse JSON value: %v", err)
}
flattener := jsonparser.JSONFlattener{Fields: fields}
if err := flattener.FullFlattenJSON(name, value, true, true); err != nil {
return nil, fmt.Errorf("failed to flatten JSON: %v", err)
}
}
return fields, nil
}