package gnmi import ( "context" "crypto/tls" "errors" "fmt" "io" "net" "path" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" gnmiLib "github.com/openconfig/gnmi/proto/gnmi" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/encoding/protojson" ) type handler struct { address string aliases map[string]string tagsubs []TagSubscription maxMsgSize int emptyNameWarnShown bool tagStore *tagStore trace bool log telegraf.Logger } func newHandler(addr string, aliases map[string]string, subs []TagSubscription, maxsize int, l telegraf.Logger, trace bool) *handler { return &handler{ address: addr, aliases: aliases, tagsubs: subs, maxMsgSize: maxsize, tagStore: newTagStore(subs), trace: trace, log: l, } } // SubscribeGNMI and extract telemetry data func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error { var creds credentials.TransportCredentials if tlscfg != nil { creds = credentials.NewTLS(tlscfg) } else { creds = insecure.NewCredentials() } opts := []grpc.DialOption{ grpc.WithTransportCredentials(creds), } if h.maxMsgSize > 0 { opts = append(opts, grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(h.maxMsgSize), )) } client, err := grpc.DialContext(ctx, h.address, opts...) if err != nil { return fmt.Errorf("failed to dial: %w", err) } defer client.Close() subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx) if err != nil { return fmt.Errorf("failed to setup subscription: %w", err) } // If io.EOF is returned, the stream may have ended and stream status // can be determined by calling Recv. if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) { return fmt.Errorf("failed to send subscription request: %w", err) } h.log.Debugf("Connection to gNMI device %s established", h.address) defer h.log.Debugf("Connection to gNMI device %s closed", h.address) for ctx.Err() == nil { var reply *gnmiLib.SubscribeResponse if reply, err = subscribeClient.Recv(); err != nil { if !errors.Is(err, io.EOF) && ctx.Err() == nil { return fmt.Errorf("aborted gNMI subscription: %w", err) } break } if h.trace { buf, err := protojson.Marshal(reply) if err != nil { h.log.Debugf("marshal failed: %v", err) } else { t := reply.GetUpdate().GetTimestamp() h.log.Debugf("update_%v: %s", t, string(buf)) } } if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok { h.handleSubscribeResponseUpdate(acc, response) } } return nil } // Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, response *gnmiLib.SubscribeResponse_Update) { var prefix, prefixAliasPath string grouper := metric.NewSeriesGrouper() timestamp := time.Unix(0, response.Update.Timestamp) prefixTags := make(map[string]string) if response.Update.Prefix != nil { var err error if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, h.aliases, ""); err != nil { h.log.Errorf("handling path %q failed: %v", response.Update.Prefix, err) } } prefixTags["source"], _, _ = net.SplitHostPort(h.address) if prefix != "" { prefixTags["path"] = prefix } // Process and remove tag-updates from the response first so we will // add all available tags to the metrics later. var valueUpdates []*gnmiLib.Update for _, update := range response.Update.Update { fullPath := pathWithPrefix(response.Update.Prefix, update.Path) // Prepare tags from prefix tags := make(map[string]string, len(prefixTags)) for key, val := range prefixTags { tags[key] = val } _, fields := h.handleTelemetryField(update, tags, prefix) var tagUpdate bool for _, tagSub := range h.tagsubs { if !equalPathNoKeys(fullPath, tagSub.fullPath) { continue } h.log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update) if err := h.tagStore.insert(tagSub, fullPath, fields, tags); err != nil { h.log.Errorf("inserting tag failed: %w", err) } tagUpdate = true break } if !tagUpdate { valueUpdates = append(valueUpdates, update) } } // Parse individual Update message and create measurements var name, lastAliasPath string for _, update := range valueUpdates { fullPath := pathWithPrefix(response.Update.Prefix, update.Path) // Prepare tags from prefix tags := make(map[string]string, len(prefixTags)) for key, val := range prefixTags { tags[key] = val } aliasPath, fields := h.handleTelemetryField(update, tags, prefix) // Add the tags derived via tag-subscriptions for k, v := range h.tagStore.lookup(fullPath, tags) { tags[k] = v } // Inherent valid alias from prefix parsing if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { aliasPath = prefixAliasPath } // Lookup alias if alias-path has changed if aliasPath != lastAliasPath { name = prefix if alias, ok := h.aliases[aliasPath]; ok { name = alias } else { h.log.Debugf("No measurement alias for gNMI path: %s", name) } lastAliasPath = aliasPath } // Check for empty names if name == "" && !h.emptyNameWarnShown { h.log.Warnf(emptyNameWarning, response.Update) h.emptyNameWarnShown = true } // Group metrics for k, v := range fields { key := k if len(aliasPath) < len(key) && len(aliasPath) != 0 { // This may not be an exact prefix, due to naming style // conversion on the key. key = key[len(aliasPath)+1:] } else if len(aliasPath) >= len(key) { // Otherwise use the last path element as the field key. key = path.Base(key) // If there are no elements skip the item; this would be an // invalid message. key = strings.TrimLeft(key, "/.") if key == "" { h.log.Errorf("invalid empty path: %q", k) continue } } grouper.Add(name, tags, timestamp, key, v) } } // Add grouped measurements for _, metricToAdd := range grouper.Metrics() { acc.AddMetric(metricToAdd) } } // HandleTelemetryField and add it to a measurement func (h *handler) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { gpath, aliasPath, err := handlePath(update.Path, tags, h.aliases, prefix) if err != nil { h.log.Errorf("handling path %q failed: %v", update.Path, err) } fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val) if err != nil { h.log.Errorf("error parsing update value %q: %v", update.Val, err) } return aliasPath, fields }