//go:generate ../../../tools/readme_config_includer/generator package gnmi import ( "bytes" "context" "crypto/tls" _ "embed" "encoding/json" "fmt" "io" "math" "net" "path" "regexp" "strings" "sync" "time" "github.com/google/gnxi/utils/xpath" gnmiLib "github.com/openconfig/gnmi/proto/gnmi" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" internaltls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) //go:embed sample.conf var sampleConfig string // Regular expression to see if a path element contains an origin var originPattern = regexp.MustCompile(`^([\w-_]+):`) // Define the warning to show if we cannot get a metric name. const emptyNameWarning = `Got empty metric-name for response, usually indicating configuration issues as the response cannot be related to any subscription. Please open an issue on https://github.com/influxdata/telegraf including your device model and the following response data: %+v This message is only printed once.` // gNMI plugin instance type GNMI struct { Addresses []string `toml:"addresses"` Subscriptions []Subscription `toml:"subscription"` TagSubscriptions []TagSubscription `toml:"tag_subscription"` Aliases map[string]string `toml:"aliases"` // Optional subscription configuration Encoding string Origin string Prefix string Target string UpdatesOnly bool `toml:"updates_only"` // gNMI target credentials Username string Password string // Redial Redial config.Duration // GRPC TLS settings EnableTLS bool `toml:"enable_tls"` internaltls.ClientConfig // Internal state internalAliases map[string]string acc telegraf.Accumulator cancel context.CancelFunc wg sync.WaitGroup legacyTags bool emptyNameWarnShown bool Log telegraf.Logger } type Worker struct { address string tagStore *tagNode } type tagNode struct { elem *gnmiLib.PathElem tagName string value *gnmiLib.TypedValue tagStore map[string][]*tagNode } type tagResults struct { names []string values []*gnmiLib.TypedValue } // Subscription for a gNMI client type Subscription struct { Name string Origin string Path string fullPath *gnmiLib.Path // Subscription mode and interval SubscriptionMode string `toml:"subscription_mode"` SampleInterval config.Duration `toml:"sample_interval"` // Duplicate suppression SuppressRedundant bool `toml:"suppress_redundant"` HeartbeatInterval config.Duration `toml:"heartbeat_interval"` // Mark this subscription as a tag-only lookup source, not emitting any metric TagOnly bool `toml:"tag_only"` } // Tag Subscription for a gNMI client type TagSubscription struct { Subscription Elements []string } func (*GNMI) SampleConfig() string { return sampleConfig } // Start the http listener service func (c *GNMI) Start(acc telegraf.Accumulator) error { var err error var ctx context.Context var tlscfg *tls.Config var request *gnmiLib.SubscribeRequest c.acc = acc ctx, c.cancel = context.WithCancel(context.Background()) for i := len(c.Subscriptions) - 1; i >= 0; i-- { subscription := c.Subscriptions[i] // Support legacy TagOnly subscriptions if subscription.TagOnly { tagSub := convertTagOnlySubscription(subscription) c.TagSubscriptions = append(c.TagSubscriptions, tagSub) // Remove from the original subscriptions list c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...) c.legacyTags = true continue } if err = subscription.buildFullPath(c); err != nil { return err } } for idx := range c.TagSubscriptions { if err = c.TagSubscriptions[idx].buildFullPath(c); err != nil { return err } if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly { return fmt.Errorf("do not mix legacy tag_only subscriptions and tag subscriptions") } if len(c.TagSubscriptions[idx].Elements) == 0 { return fmt.Errorf("tag_subscription must have at least one element") } } // Validate configuration if request, err = c.newSubscribeRequest(); err != nil { return err } else if time.Duration(c.Redial).Nanoseconds() <= 0 { return fmt.Errorf("redial duration must be positive") } // Parse TLS config if c.EnableTLS { if tlscfg, err = c.ClientConfig.TLSConfig(); err != nil { return err } } if len(c.Username) > 0 { ctx = metadata.AppendToOutgoingContext(ctx, "username", c.Username, "password", c.Password) } // Invert explicit alias list and prefill subscription names c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions)) for _, s := range c.Subscriptions { if err := s.buildAlias(c.internalAliases); err != nil { return err } } for _, s := range c.TagSubscriptions { if err := s.buildAlias(c.internalAliases); err != nil { return err } } for alias, encodingPath := range c.Aliases { c.internalAliases[encodingPath] = alias } c.Log.Debugf("Internal alias mapping: %+v", c.internalAliases) // Create a goroutine for each device, dial and subscribe c.wg.Add(len(c.Addresses)) for _, addr := range c.Addresses { worker := Worker{address: addr} worker.tagStore = &tagNode{} go func(worker Worker) { defer c.wg.Done() for ctx.Err() == nil { if err := c.subscribeGNMI(ctx, &worker, tlscfg, request); err != nil && ctx.Err() == nil { acc.AddError(err) } select { case <-ctx.Done(): case <-time.After(time.Duration(c.Redial)): } } }(worker) } return nil } func (s *Subscription) buildSubscription() (*gnmiLib.Subscription, error) { gnmiPath, err := parsePath(s.Origin, s.Path, "") if err != nil { return nil, err } mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(s.SubscriptionMode)] if !ok { return nil, fmt.Errorf("invalid subscription mode %s", s.SubscriptionMode) } return &gnmiLib.Subscription{ Path: gnmiPath, Mode: gnmiLib.SubscriptionMode(mode), HeartbeatInterval: uint64(time.Duration(s.HeartbeatInterval).Nanoseconds()), SampleInterval: uint64(time.Duration(s.SampleInterval).Nanoseconds()), SuppressRedundant: s.SuppressRedundant, }, nil } // Create a new gNMI SubscribeRequest func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) { // Create subscription objects var err error subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions)+len(c.TagSubscriptions)) for i, subscription := range c.TagSubscriptions { if subscriptions[i], err = subscription.buildSubscription(); err != nil { return nil, err } } for i, subscription := range c.Subscriptions { if subscriptions[i+len(c.TagSubscriptions)], err = subscription.buildSubscription(); err != nil { return nil, err } } // Construct subscribe request gnmiPath, err := parsePath(c.Origin, c.Prefix, c.Target) if err != nil { return nil, err } // Do not provide an empty prefix. Required for Huawei NE40 router v8.21 // (and possibly others). See https://github.com/influxdata/telegraf/issues/12273. if gnmiPath.Origin == "" && gnmiPath.Target == "" && len(gnmiPath.Elem) == 0 { gnmiPath = nil } if c.Encoding != "proto" && c.Encoding != "json" && c.Encoding != "json_ietf" && c.Encoding != "bytes" { return nil, fmt.Errorf("unsupported encoding %s", c.Encoding) } return &gnmiLib.SubscribeRequest{ Request: &gnmiLib.SubscribeRequest_Subscribe{ Subscribe: &gnmiLib.SubscriptionList{ Prefix: gnmiPath, Mode: gnmiLib.SubscriptionList_STREAM, Encoding: gnmiLib.Encoding(gnmiLib.Encoding_value[strings.ToUpper(c.Encoding)]), Subscription: subscriptions, UpdatesOnly: c.UpdatesOnly, }, }, }, nil } // SubscribeGNMI and extract telemetry data func (c *GNMI) subscribeGNMI(ctx context.Context, worker *Worker, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { var creds credentials.TransportCredentials if tlscfg != nil { creds = credentials.NewTLS(tlscfg) } else { creds = insecure.NewCredentials() } opt := grpc.WithTransportCredentials(creds) client, err := grpc.DialContext(ctx, worker.address, opt) if err != nil { return fmt.Errorf("failed to dial: %v", err) } defer client.Close() subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx) if err != nil { return fmt.Errorf("failed to setup subscription: %v", err) } if err = subscribeClient.Send(request); err != nil { // If io.EOF is returned, the stream may have ended and stream status // can be determined by calling Recv. if err != io.EOF { return fmt.Errorf("failed to send subscription request: %v", err) } } c.Log.Debugf("Connection to gNMI device %s established", worker.address) defer c.Log.Debugf("Connection to gNMI device %s closed", worker.address) for ctx.Err() == nil { var reply *gnmiLib.SubscribeResponse if reply, err = subscribeClient.Recv(); err != nil { if err != io.EOF && ctx.Err() == nil { return fmt.Errorf("aborted gNMI subscription: %v", err) } break } c.handleSubscribeResponse(worker, reply) } return nil } func (c *GNMI) handleSubscribeResponse(worker *Worker, reply *gnmiLib.SubscribeResponse) { if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok { c.handleSubscribeResponseUpdate(worker, response) } } // Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.SubscribeResponse_Update) { var prefix, prefixAliasPath string grouper := metric.NewSeriesGrouper() timestamp := time.Unix(0, response.Update.Timestamp) prefixTags := make(map[string]string) if response.Update.Prefix != nil { var err error if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, c.internalAliases, ""); err != nil { c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err) } } prefixTags["source"], _, _ = net.SplitHostPort(worker.address) prefixTags["path"] = prefix // Process and remove tag-only updates from the response for i := len(response.Update.Update) - 1; i >= 0; i-- { update := response.Update.Update[i] fullPath := pathWithPrefix(response.Update.Prefix, update.Path) for _, tagSub := range c.TagSubscriptions { if equalPathNoKeys(fullPath, tagSub.fullPath) { worker.storeTags(update, tagSub) response.Update.Update = append(response.Update.Update[:i], response.Update.Update[i+1:]...) } } } // Parse individual Update message and create measurements var name, lastAliasPath string for _, update := range response.Update.Update { fullPath := pathWithPrefix(response.Update.Prefix, update.Path) // Prepare tags from prefix tags := make(map[string]string, len(prefixTags)) for key, val := range prefixTags { tags[key] = val } aliasPath, fields := c.handleTelemetryField(update, tags, prefix) if tagOnlyTags := worker.checkTags(fullPath); tagOnlyTags != nil { for k, v := range tagOnlyTags { if alias, ok := c.internalAliases[k]; ok { tags[alias] = fmt.Sprint(v) } else { tags[k] = fmt.Sprint(v) } } } // Inherent valid alias from prefix parsing if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { aliasPath = prefixAliasPath } // Lookup alias if alias-path has changed if aliasPath != lastAliasPath { name = prefix if alias, ok := c.internalAliases[aliasPath]; ok { name = alias } else { c.Log.Debugf("No measurement alias for gNMI path: %s", name) } } // Check for empty names if name == "" && !c.emptyNameWarnShown { c.Log.Warnf(emptyNameWarning, response.Update) c.emptyNameWarnShown = true } // Group metrics for k, v := range fields { key := k if len(aliasPath) < len(key) && len(aliasPath) != 0 { // This may not be an exact prefix, due to naming style // conversion on the key. key = key[len(aliasPath)+1:] } else if len(aliasPath) >= len(key) { // Otherwise use the last path element as the field key. key = path.Base(key) // If there are no elements skip the item; this would be an // invalid message. key = strings.TrimLeft(key, "/.") if key == "" { c.Log.Errorf("invalid empty path: %q", k) continue } } grouper.Add(name, tags, timestamp, key, v) } lastAliasPath = aliasPath } // Add grouped measurements for _, metricToAdd := range grouper.Metrics() { c.acc.AddMetric(metricToAdd) } } // HandleTelemetryField and add it to a measurement func (c *GNMI) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { gpath, aliasPath, err := handlePath(update.Path, tags, c.internalAliases, prefix) if err != nil { c.Log.Errorf("handling path %q failed: %v", update.Path, err) } fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val) if err != nil { c.Log.Errorf("error parsing update value %q: %v", update.Val, err) } return aliasPath, fields } // Parse path to path-buffer and tag-field func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) { builder := bytes.NewBufferString(prefix) // Some devices do report the origin in the first path element // so try to find out if this is the case. if gnmiPath.Origin == "" && len(gnmiPath.Elem) > 0 { groups := originPattern.FindStringSubmatch(gnmiPath.Elem[0].Name) if len(groups) == 2 { gnmiPath.Origin = groups[1] gnmiPath.Elem[0].Name = gnmiPath.Elem[0].Name[len(groups[1])+1:] } } // Prefix with origin if len(gnmiPath.Origin) > 0 { if _, err := builder.WriteString(gnmiPath.Origin); err != nil { return "", "", err } if _, err := builder.WriteRune(':'); err != nil { return "", "", err } } // Parse generic keys from prefix for _, elem := range gnmiPath.Elem { if len(elem.Name) > 0 { if _, err := builder.WriteRune('/'); err != nil { return "", "", err } if _, err := builder.WriteString(elem.Name); err != nil { return "", "", err } } name := builder.String() if _, exists := aliases[name]; exists { aliasPath = name } if tags != nil { for key, val := range elem.Key { key = strings.ReplaceAll(key, "-", "_") // Use short-form of key if possible if _, exists := tags[key]; exists { tags[name+"/"+key] = val } else { tags[key] = val } } } } return builder.String(), aliasPath, nil } // ParsePath from XPath-like string to gNMI path structure func parsePath(origin string, pathToParse string, target string) (*gnmiLib.Path, error) { gnmiPath, err := xpath.ToGNMIPath(pathToParse) if err != nil { return nil, err } gnmiPath.Origin = origin gnmiPath.Target = target return gnmiPath, err } // Stop listener and cleanup func (c *GNMI) Stop() { c.cancel() c.wg.Wait() } // Gather plugin measurements (unused) func (c *GNMI) Gather(_ telegraf.Accumulator) error { return nil } func New() telegraf.Input { return &GNMI{ Encoding: "proto", Redial: config.Duration(10 * time.Second), } } func init() { inputs.Add("gnmi", New) // Backwards compatible alias: inputs.Add("cisco_telemetry_gnmi", New) } func convertTagOnlySubscription(s Subscription) TagSubscription { t := TagSubscription{Subscription: s, Elements: []string{"interface"}} return t } // equalPathNoKeys checks if two gNMI paths are equal, without keys func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool { if len(a.Elem) != len(b.Elem) { return false } for i := range a.Elem { if a.Elem[i].Name != b.Elem[i].Name { return false } } return true } func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem { var newPath []*gnmiLib.PathElem for _, elem := range gpath.Elem { if elem.Key != nil { newPath = append(newPath, elem) } } return newPath } func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path { if prefix == nil { return gpath } fullPath := new(gnmiLib.Path) fullPath.Origin = prefix.Origin fullPath.Target = prefix.Target fullPath.Elem = append(prefix.Elem, gpath.Elem...) return fullPath } func (s *Subscription) buildFullPath(c *GNMI) error { var err error if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil { return err } s.fullPath.Origin = s.Origin s.fullPath.Target = c.Target if c.Prefix != "" { prefix, err := xpath.ToGNMIPath(c.Prefix) if err != nil { return err } s.fullPath.Elem = append(prefix.Elem, s.fullPath.Elem...) if s.Origin == "" && c.Origin != "" { s.fullPath.Origin = c.Origin } } return nil } func (w *Worker) storeTags(update *gnmiLib.Update, sub TagSubscription) { updateKeys := pathKeys(update.Path) var foundKey bool for _, requiredKey := range sub.Elements { foundKey = false for _, elem := range updateKeys { if elem.Name == requiredKey { foundKey = true } } if !foundKey { return } } // All required keys present for this TagSubscription w.tagStore.insert(updateKeys, sub.Name, update.Val) } func (node *tagNode) insert(keys []*gnmiLib.PathElem, name string, value *gnmiLib.TypedValue) { if len(keys) == 0 { node.value = value node.tagName = name return } var found *tagNode key := keys[0] keyName := key.Name if node.tagStore == nil { node.tagStore = make(map[string][]*tagNode) } if _, ok := node.tagStore[keyName]; !ok { node.tagStore[keyName] = make([]*tagNode, 0) } for _, node := range node.tagStore[keyName] { if compareKeys(node.elem.Key, key.Key) { found = node break } } if found == nil { found = &tagNode{elem: keys[0]} node.tagStore[keyName] = append(node.tagStore[keyName], found) } found.insert(keys[1:], name, value) } func (node *tagNode) retrieve(keys []*gnmiLib.PathElem, tagResults *tagResults) { if node.value != nil { tagResults.names = append(tagResults.names, node.tagName) tagResults.values = append(tagResults.values, node.value) } for _, key := range keys { if elems, ok := node.tagStore[key.Name]; ok { for _, node := range elems { if compareKeys(node.elem.Key, key.Key) { node.retrieve(keys, tagResults) } } } } } func (w *Worker) checkTags(fullPath *gnmiLib.Path) map[string]interface{} { results := &tagResults{} w.tagStore.retrieve(pathKeys(fullPath), results) tags := make(map[string]interface{}) for idx := range results.names { vals, _ := gnmiToFields(results.names[idx], results.values[idx]) for k, v := range vals { tags[k] = v } } return tags } func (s *Subscription) buildAlias(aliases map[string]string) error { var err error var gnmiLongPath, gnmiShortPath *gnmiLib.Path // Build the subscription path without keys if gnmiLongPath, err = parsePath(s.Origin, s.Path, ""); err != nil { return err } if gnmiShortPath, err = parsePath("", s.Path, ""); err != nil { return err } longPath, _, err := handlePath(gnmiLongPath, nil, nil, "") if err != nil { return fmt.Errorf("handling long-path failed: %v", err) } shortPath, _, err := handlePath(gnmiShortPath, nil, nil, "") if err != nil { return fmt.Errorf("handling short-path failed: %v", err) } // If the user didn't provide a measurement name, use last path element name := s.Name if len(name) == 0 { name = path.Base(shortPath) } if len(name) > 0 { aliases[longPath] = name aliases[shortPath] = name } return nil } func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) { var value interface{} var jsondata []byte // Make sure a value is actually set if updateVal == nil || updateVal.Value == nil { return nil, nil } switch val := updateVal.Value.(type) { case *gnmiLib.TypedValue_AsciiVal: value = val.AsciiVal case *gnmiLib.TypedValue_BoolVal: value = val.BoolVal case *gnmiLib.TypedValue_BytesVal: value = val.BytesVal case *gnmiLib.TypedValue_DoubleVal: value = val.DoubleVal case *gnmiLib.TypedValue_DecimalVal: //nolint:staticcheck // to maintain backward compatibility with older gnmi specs value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision)) case *gnmiLib.TypedValue_FloatVal: //nolint:staticcheck // to maintain backward compatibility with older gnmi specs value = val.FloatVal case *gnmiLib.TypedValue_IntVal: value = val.IntVal case *gnmiLib.TypedValue_StringVal: value = val.StringVal case *gnmiLib.TypedValue_UintVal: value = val.UintVal case *gnmiLib.TypedValue_JsonIetfVal: jsondata = val.JsonIetfVal case *gnmiLib.TypedValue_JsonVal: jsondata = val.JsonVal } fields := make(map[string]interface{}) if value != nil { fields[name] = value } else if jsondata != nil { if err := json.Unmarshal(jsondata, &value); err != nil { return nil, fmt.Errorf("failed to parse JSON value: %v", err) } flattener := jsonparser.JSONFlattener{Fields: fields} if err := flattener.FullFlattenJSON(name, value, true, true); err != nil { return nil, fmt.Errorf("failed to flatten JSON: %v", err) } } return fields, nil } func compareKeys(a map[string]string, b map[string]string) bool { if len(a) != len(b) { return false } for k, v := range a { if _, ok := b[k]; !ok { return false } if b[k] != v { return false } } return true }