From c6f1c66bf83daa2b2e324a734cfa813c492aa53c Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 31 Oct 2023 17:51:05 +0100 Subject: [PATCH] feat(inputs.gnmi): Rework plugin (#14091) --- plugins/inputs/gnmi/README.md | 4 + plugins/inputs/gnmi/gnmi.go | 30 +- plugins/inputs/gnmi/handler.go | 225 +++++---- plugins/inputs/gnmi/path.go | 291 ++++++++++++ plugins/inputs/gnmi/sample.conf | 4 + plugins/inputs/gnmi/tag_store.go | 55 ++- .../gnmi/testcases/issue_14044/expected.out | 1 + .../gnmi/testcases/issue_14044/responses.json | 33 ++ .../gnmi/testcases/issue_14044/telegraf.conf | 12 + .../gnmi/testcases/issue_14063/expected.out | 1 + .../gnmi/testcases/issue_14063/responses.json | 445 ++++++++++++++++++ .../gnmi/testcases/issue_14063/telegraf.conf | 11 + plugins/inputs/gnmi/update_fields.go | 92 ++++ plugins/inputs/gnmi/utils.go | 154 ------ 14 files changed, 1063 insertions(+), 295 deletions(-) create mode 100644 plugins/inputs/gnmi/path.go create mode 100644 plugins/inputs/gnmi/testcases/issue_14044/expected.out create mode 100644 plugins/inputs/gnmi/testcases/issue_14044/responses.json create mode 100644 plugins/inputs/gnmi/testcases/issue_14044/telegraf.conf create mode 100644 plugins/inputs/gnmi/testcases/issue_14063/expected.out create mode 100644 plugins/inputs/gnmi/testcases/issue_14063/responses.json create mode 100644 plugins/inputs/gnmi/testcases/issue_14063/telegraf.conf create mode 100644 plugins/inputs/gnmi/update_fields.go delete mode 100644 plugins/inputs/gnmi/utils.go diff --git a/plugins/inputs/gnmi/README.md b/plugins/inputs/gnmi/README.md index dcb6cabe7..6802f1d11 100644 --- a/plugins/inputs/gnmi/README.md +++ b/plugins/inputs/gnmi/README.md @@ -61,6 +61,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Remove leading slashes and dots in field-name # trim_field_names = false + ## Guess the path-tag if an update does not contain a prefix-path + ## If enabled, the common-path of all elements in the update is used. + # guess_path_tag = false + ## enable client-side TLS and define CA to authenticate the device # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/inputs/gnmi/gnmi.go b/plugins/inputs/gnmi/gnmi.go index 52f4d600b..f46530494 100644 --- a/plugins/inputs/gnmi/gnmi.go +++ b/plugins/inputs/gnmi/gnmi.go @@ -5,8 +5,6 @@ import ( "context" _ "embed" "fmt" - "path" - "regexp" "strings" "sync" "time" @@ -25,9 +23,6 @@ import ( //go:embed sample.conf var sampleConfig string -// Regular expression to see if a path element contains an origin -var originPattern = regexp.MustCompile(`^([\w-_]+):`) - // Define the warning to show if we cannot get a metric name. const emptyNameWarning = `Got empty metric-name for response, usually indicating configuration issues as the response cannot be related to any subscription. @@ -58,12 +53,13 @@ type GNMI struct { Trace bool `toml:"dump_responses"` CanonicalFieldNames bool `toml:"canonical_field_names"` TrimFieldNames bool `toml:"trim_field_names"` + GuessPathTag bool `toml:"guess_path_tag"` EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;use 'tls_enable' instead"` Log telegraf.Logger `toml:"-"` internaltls.ClientConfig // Internal state - internalAliases map[string]string + internalAliases map[*pathInfo]string cancel context.CancelFunc wg sync.WaitGroup } @@ -169,7 +165,7 @@ func (c *GNMI) Init() error { } // Invert explicit alias list and prefill subscription names - c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions)) + c.internalAliases = make(map[*pathInfo]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions)) for _, s := range c.Subscriptions { if err := s.buildAlias(c.internalAliases); err != nil { return err @@ -181,7 +177,7 @@ func (c *GNMI) Init() error { } } for alias, encodingPath := range c.Aliases { - c.internalAliases[encodingPath] = alias + c.internalAliases[newInfoFromString(encodingPath)] = alias } c.Log.Debugf("Internal alias mapping: %+v", c.internalAliases) @@ -224,6 +220,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { trace: c.Trace, canonicalFieldNames: c.CanonicalFieldNames, trimSlash: c.TrimFieldNames, + guessPathTag: c.GuessPathTag, log: c.Log, } for ctx.Err() == nil { @@ -362,26 +359,21 @@ func (s *Subscription) buildFullPath(c *GNMI) error { return nil } -func (s *Subscription) buildAlias(aliases map[string]string) error { +func (s *Subscription) buildAlias(aliases map[*pathInfo]string) error { // Build the subscription path without keys - gnmiPath, err := parsePath(s.Origin, s.Path, "") + path, err := parsePath(s.Origin, s.Path, "") if err != nil { return err } - - origin, spath, _, err := handlePath(gnmiPath, nil, nil, "") - if err != nil { - return fmt.Errorf("handling path failed: %w", err) - } + info := newInfoFromPathWithoutKeys(path) // If the user didn't provide a measurement name, use last path element name := s.Name - if name == "" { - name = path.Base(spath) + if name == "" && len(info.segments) > 0 { + name = info.segments[len(info.segments)-1] } if name != "" { - aliases[origin+spath] = name - aliases[spath] = name + aliases[info] = name } return nil } diff --git a/plugins/inputs/gnmi/handler.go b/plugins/inputs/gnmi/handler.go index 861c87bf0..81e8f04f4 100644 --- a/plugins/inputs/gnmi/handler.go +++ b/plugins/inputs/gnmi/handler.go @@ -8,6 +8,7 @@ import ( "io" "net" "path" + "sort" "strconv" "strings" "time" @@ -31,7 +32,7 @@ const eidJuniperTelemetryHeader = 1 type handler struct { address string - aliases map[string]string + aliases map[*pathInfo]string tagsubs []TagSubscription maxMsgSize int emptyNameWarnShown bool @@ -40,6 +41,7 @@ type handler struct { trace bool canonicalFieldNames bool trimSlash bool + guessPathTag bool log telegraf.Logger } @@ -117,74 +119,70 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t // Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, response *gnmiLib.SubscribeResponse_Update, extension []*gnmiExt.Extension) { - var prefix, prefixAliasPath string grouper := metric.NewSeriesGrouper() timestamp := time.Unix(0, response.Update.Timestamp) - prefixTags := make(map[string]string) - // iter on each extension + // Extract tags from potential extension in the update notification + headerTags := make(map[string]string) for _, ext := range extension { currentExt := ext.GetRegisteredExt().Msg if currentExt == nil { break } - // extension ID + switch ext.GetRegisteredExt().Id { - // Juniper Header extention - //EID_JUNIPER_TELEMETRY_HEADER = 1; case eidJuniperTelemetryHeader: + // Juniper Header extention // Decode it only if user requested it if choice.Contains("juniper_header", h.vendorExt) { juniperHeader := &jnprHeader.GnmiJuniperTelemetryHeaderExtension{} - // unmarshal extention - err := proto.Unmarshal(currentExt, juniperHeader) - if err != nil { + if err := proto.Unmarshal(currentExt, juniperHeader); err != nil { h.log.Errorf("unmarshal gnmi Juniper Header extension failed: %v", err) - break + } else { + // Add only relevant Tags from the Juniper Header extension. + // These are required for aggregation + headerTags["component_id"] = strconv.FormatUint(uint64(juniperHeader.GetComponentId()), 10) + headerTags["component"] = juniperHeader.GetComponent() + headerTags["sub_component_id"] = strconv.FormatUint(uint64(juniperHeader.GetSubComponentId()), 10) } - // Add only relevant Tags from the Juniper Header extension. - // These are required for aggregation - prefixTags["component_id"] = strconv.FormatUint(uint64(juniperHeader.GetComponentId()), 10) - prefixTags["component"] = juniperHeader.GetComponent() - prefixTags["sub_component_id"] = strconv.FormatUint(uint64(juniperHeader.GetSubComponentId()), 10) } - default: continue } } - if response.Update.Prefix != nil { - var origin string - var err error - if origin, prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, h.aliases, ""); err != nil { - h.log.Errorf("Handling path %q failed: %v", response.Update.Prefix, err) - } - prefix = origin + prefix + // Extract the path part valid for the whole set of updates if any + prefix := newInfoFromPath(response.Update.Prefix) + + // Add info to the tags + headerTags["source"], _, _ = net.SplitHostPort(h.address) + if !prefix.empty() { + headerTags["path"] = prefix.String() } - prefixTags["source"], _, _ = net.SplitHostPort(h.address) - if prefix != "" { - prefixTags["path"] = prefix - } - - // Process and remove tag-updates from the response first so we will + // Process and remove tag-updates from the response first so we can // add all available tags to the metrics later. - var valueUpdates []*gnmiLib.Update + var valueFields []updateField for _, update := range response.Update.Update { - fullPath := pathWithPrefix(response.Update.Prefix, update.Path) + fullPath := prefix.append(update.Path) + fields, err := newFieldsFromUpdate(fullPath, update) + if err != nil { + h.log.Errorf("Processing update %v failed: %v", update, err) + } // Prepare tags from prefix - tags := make(map[string]string, len(prefixTags)) - for key, val := range prefixTags { + tags := make(map[string]string, len(headerTags)) + for key, val := range headerTags { + tags[key] = val + } + for key, val := range fullPath.Tags() { tags[key] = val } - _, fields := h.handleTelemetryField(update, tags, prefix) - + // TODO: Handle each field individually to allow in-JSON tags var tagUpdate bool for _, tagSub := range h.tagsubs { - if !equalPathNoKeys(fullPath, tagSub.fullPath) { + if !fullPath.equalsPathNoKeys(tagSub.fullPath) { continue } h.log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update) @@ -195,77 +193,71 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon break } if !tagUpdate { - valueUpdates = append(valueUpdates, update) + valueFields = append(valueFields, fields...) } } - // Parse individual Update message and create measurements - var name, lastAliasPath string - for _, update := range valueUpdates { - fullPath := pathWithPrefix(response.Update.Prefix, update.Path) + // Some devices do not provide a prefix, so do some guesswork based + // on the paths of the fields + if headerTags["path"] == "" && h.guessPathTag { + if prefixPath := guessPrefixFromUpdate(valueFields); prefixPath != "" { + headerTags["path"] = prefixPath + } + } + // Parse individual update message and create measurements + for _, field := range valueFields { // Prepare tags from prefix - tags := make(map[string]string, len(prefixTags)) - for key, val := range prefixTags { + fieldTags := field.path.Tags() + tags := make(map[string]string, len(headerTags)+len(fieldTags)) + for key, val := range headerTags { + tags[key] = val + } + for key, val := range fieldTags { tags[key] = val } - aliasPath, fields := h.handleTelemetryField(update, tags, prefix) - // Add the tags derived via tag-subscriptions - for k, v := range h.tagStore.lookup(fullPath, tags) { + for k, v := range h.tagStore.lookup(field.path, tags) { tags[k] = v } - // Inherent valid alias from prefix parsing - if len(prefixAliasPath) > 0 && len(aliasPath) == 0 { - aliasPath = prefixAliasPath - } - - // Lookup alias if alias-path has changed - if aliasPath != lastAliasPath { - name = prefix - if alias, ok := h.aliases[aliasPath]; ok { - name = alias - } else { - h.log.Debugf("No measurement alias for gNMI path: %s", name) + // Lookup alias for the metric + aliasPath, name := h.lookupAlias(field.path) + if name == "" { + h.log.Debugf("No measurement alias for gNMI path: %s", field.path) + if !h.emptyNameWarnShown { + h.log.Warnf(emptyNameWarning, response.Update) + h.emptyNameWarnShown = true } - lastAliasPath = aliasPath - } - - // Check for empty names - if name == "" && !h.emptyNameWarnShown { - h.log.Warnf(emptyNameWarning, response.Update) - h.emptyNameWarnShown = true } // Group metrics - for k, v := range fields { - key := k - if h.canonicalFieldNames { - // Strip the origin is any for the field names - if parts := strings.SplitN(key, ":", 2); len(parts) == 2 { - key = parts[1] - } - } else { - if len(aliasPath) < len(key) && len(aliasPath) != 0 { - // This may not be an exact prefix, due to naming style - // conversion on the key. - key = key[len(aliasPath)+1:] - } else if len(aliasPath) >= len(key) { - // Otherwise use the last path element as the field key. - key = path.Base(key) - } + fieldPath := field.path.String() + key := strings.ReplaceAll(fieldPath, "-", "_") + if h.canonicalFieldNames { + // Strip the origin is any for the field names + if parts := strings.SplitN(key, ":", 2); len(parts) == 2 { + key = parts[1] } - if h.trimSlash { - key = strings.TrimLeft(key, "/.") + } else { + if len(aliasPath) < len(key) && len(aliasPath) != 0 { + // This may not be an exact prefix, due to naming style + // conversion on the key. + key = key[len(aliasPath)+1:] + } else if len(aliasPath) >= len(key) { + // Otherwise use the last path element as the field key. + key = path.Base(key) } - if key == "" { - h.log.Errorf("Invalid empty path: %q", k) - continue - } - grouper.Add(name, tags, timestamp, key, v) } + if h.trimSlash { + key = strings.TrimLeft(key, "/.") + } + if key == "" { + h.log.Errorf("Invalid empty path %q with alias %q", fieldPath, aliasPath) + continue + } + grouper.Add(name, tags, timestamp, key, field.value) } // Add grouped measurements @@ -274,15 +266,48 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon } } -// HandleTelemetryField and add it to a measurement -func (h *handler) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) { - _, gpath, aliasPath, err := handlePath(update.Path, tags, h.aliases, prefix) - if err != nil { - h.log.Errorf("Handling path %q failed: %v", update.Path, err) - } - fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val) - if err != nil { - h.log.Errorf("Error parsing update value %q: %v", update.Val, err) - } - return aliasPath, fields +// Try to find the alias for the given path +type aliasCandidate struct { + path, alias string +} + +func (h *handler) lookupAlias(info *pathInfo) (aliasPath, alias string) { + candidates := make([]aliasCandidate, 0) + for i, a := range h.aliases { + if !i.isSubPathOf(info) { + continue + } + candidates = append(candidates, aliasCandidate{i.String(), a}) + } + if len(candidates) == 0 { + return "", "" + } + + // Reverse sort the candidates by path length so we can use the longest match + sort.SliceStable(candidates, func(i, j int) bool { + return len(candidates[i].path) > len(candidates[j].path) + }) + + return candidates[0].path, candidates[0].alias +} + +func guessPrefixFromUpdate(fields []updateField) string { + if len(fields) == 0 { + return "" + } + if len(fields) == 1 { + dir, _ := fields[0].path.split() + return dir + } + commonPath := &pathInfo{ + origin: fields[0].path.origin, + segments: append([]string{}, fields[0].path.segments...), + } + for _, f := range fields[1:] { + commonPath.keepCommonPart(f.path) + } + if commonPath.empty() { + return "" + } + return commonPath.String() } diff --git a/plugins/inputs/gnmi/path.go b/plugins/inputs/gnmi/path.go new file mode 100644 index 000000000..e00af0dd6 --- /dev/null +++ b/plugins/inputs/gnmi/path.go @@ -0,0 +1,291 @@ +package gnmi + +import ( + "regexp" + "strings" + + gnmiLib "github.com/openconfig/gnmi/proto/gnmi" +) + +// Regular expression to see if a path element contains an origin +var originPattern = regexp.MustCompile(`^([\w-_]+):`) + +type keySegment struct { + name string + path string + kv map[string]string +} + +type pathInfo struct { + origin string + target string + segments []string + keyValues []keySegment +} + +func newInfoFromString(path string) *pathInfo { + if path == "" { + return &pathInfo{} + } + + info := &pathInfo{} + for _, s := range strings.Split(path, "/") { + if s != "" { + info.segments = append(info.segments, s) + } + } + info.normalize() + + return info +} + +func newInfoFromPathWithoutKeys(path *gnmiLib.Path) *pathInfo { + info := &pathInfo{ + origin: path.Origin, + segments: make([]string, 0, len(path.Elem)), + } + for _, elem := range path.Elem { + if elem.Name == "" { + continue + } + info.segments = append(info.segments, elem.Name) + } + info.normalize() + + return info +} + +func newInfoFromPath(paths ...*gnmiLib.Path) *pathInfo { + if len(paths) == 0 { + return nil + } + + info := &pathInfo{} + if paths[0] != nil { + info.origin = paths[0].Origin + info.target = paths[0].Target + } + + for _, p := range paths { + if p == nil { + continue + } + for _, elem := range p.Elem { + if elem.Name == "" { + continue + } + info.segments = append(info.segments, elem.Name) + + if len(elem.Key) == 0 { + continue + } + keyInfo := keySegment{ + name: elem.Name, + path: info.String(), + kv: make(map[string]string, len(elem.Key)), + } + for k, v := range elem.Key { + keyInfo.kv[k] = v + } + info.keyValues = append(info.keyValues, keyInfo) + } + } + info.normalize() + + return info +} + +func (pi *pathInfo) empty() bool { + return len(pi.segments) == 0 +} + +func (pi *pathInfo) append(paths ...*gnmiLib.Path) *pathInfo { + // Copy the existing info + path := &pathInfo{ + origin: pi.origin, + target: pi.target, + segments: append([]string{}, pi.segments...), + keyValues: make([]keySegment, 0, len(pi.keyValues)), + } + for _, elem := range pi.keyValues { + keyInfo := keySegment{ + name: elem.name, + path: elem.path, + kv: make(map[string]string, len(elem.kv)), + } + for k, v := range elem.kv { + keyInfo.kv[k] = v + } + path.keyValues = append(path.keyValues, keyInfo) + } + + // Add the new segments + for _, p := range paths { + for _, elem := range p.Elem { + if elem.Name == "" { + continue + } + path.segments = append(path.segments, elem.Name) + + if len(elem.Key) == 0 { + continue + } + keyInfo := keySegment{ + name: elem.Name, + path: path.String(), + kv: make(map[string]string, len(elem.Key)), + } + for k, v := range elem.Key { + keyInfo.kv[k] = v + } + path.keyValues = append(path.keyValues, keyInfo) + } + } + + return path +} + +func (pi *pathInfo) appendSegments(segments ...string) *pathInfo { + // Copy the existing info + path := &pathInfo{ + origin: pi.origin, + target: pi.target, + segments: append([]string{}, pi.segments...), + keyValues: make([]keySegment, 0, len(pi.keyValues)), + } + for _, elem := range pi.keyValues { + keyInfo := keySegment{ + name: elem.name, + path: elem.path, + kv: make(map[string]string, len(elem.kv)), + } + for k, v := range elem.kv { + keyInfo.kv[k] = v + } + path.keyValues = append(path.keyValues, keyInfo) + } + + // Add the new segments + for _, s := range segments { + if s == "" { + continue + } + path.segments = append(path.segments, s) + } + + return path +} + +func (pi *pathInfo) normalize() { + if len(pi.segments) == 0 { + return + } + + // Some devices supply the origin as part of the first path element, + // so try to find and extract it there. + groups := originPattern.FindStringSubmatch(pi.segments[0]) + if len(groups) == 2 { + pi.origin = groups[1] + pi.segments[0] = pi.segments[0][len(groups[1])+1:] + } +} + +func (pi *pathInfo) equalsPathNoKeys(path *gnmiLib.Path) bool { + if len(pi.segments) != len(path.Elem) { + return false + } + for i, s := range pi.segments { + if s != path.Elem[i].Name { + return false + } + } + return true +} + +func (pi *pathInfo) isSubPathOf(path *pathInfo) bool { + // If both set an origin it has to match. Otherwise we ignore the origin + if pi.origin != "" && path.origin != "" && pi.origin != path.origin { + return false + } + + // The "parent" path should have the same length or be shorter than the + // sub-path to have a chance to match + if len(pi.segments) > len(path.segments) { + return false + } + + // Compare the elements and exit if we find a mismatch + for i, p := range pi.segments { + if p != path.segments[i] { + return false + } + } + + return true +} + +func (pi *pathInfo) keepCommonPart(path *pathInfo) { + shortestLen := len(pi.segments) + if len(path.segments) < shortestLen { + shortestLen = len(path.segments) + } + + // Compare the elements and stop as soon as they do mismatch + var matchLen int + for i, p := range pi.segments[:shortestLen] { + if p != path.segments[i] { + break + } + matchLen = i + 1 + } + if matchLen < 1 { + pi.segments = nil + return + } + pi.segments = pi.segments[:matchLen] +} + +func (pi *pathInfo) split() (dir, base string) { + if len(pi.segments) == 0 { + return "", "" + } + if len(pi.segments) == 1 { + return "", pi.segments[0] + } + + dir = "/" + strings.Join(pi.segments[:len(pi.segments)-1], "/") + if pi.origin != "" { + dir = pi.origin + ":" + dir + } + return dir, pi.segments[len(pi.segments)-1] +} + +func (pi *pathInfo) String() string { + if len(pi.segments) == 0 { + return "" + } + + out := "/" + strings.Join(pi.segments, "/") + if pi.origin != "" { + out = pi.origin + ":" + out + } + return out +} + +func (pi *pathInfo) Tags() map[string]string { + tags := make(map[string]string, len(pi.keyValues)) + for _, s := range pi.keyValues { + for k, v := range s.kv { + key := strings.ReplaceAll(k, "-", "_") + + // Use short-form of key if possible + if _, exists := tags[key]; !exists { + tags[key] = v + continue + } + tags[s.path+"/"+key] = v + } + } + + return tags +} diff --git a/plugins/inputs/gnmi/sample.conf b/plugins/inputs/gnmi/sample.conf index 599ea0989..7e330f79d 100644 --- a/plugins/inputs/gnmi/sample.conf +++ b/plugins/inputs/gnmi/sample.conf @@ -22,6 +22,10 @@ ## Remove leading slashes and dots in field-name # trim_field_names = false + ## Guess the path-tag if an update does not contain a prefix-path + ## If enabled, the common-path of all elements in the update is used. + # guess_path_tag = false + ## enable client-side TLS and define CA to authenticate the device # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/inputs/gnmi/tag_store.go b/plugins/inputs/gnmi/tag_store.go index a1b2a663c..357669734 100644 --- a/plugins/inputs/gnmi/tag_store.go +++ b/plugins/inputs/gnmi/tag_store.go @@ -2,12 +2,10 @@ package gnmi import ( "fmt" - "path/filepath" "sort" "strings" "github.com/influxdata/telegraf/internal" - gnmiLib "github.com/openconfig/gnmi/proto/gnmi" ) type tagStore struct { @@ -40,14 +38,19 @@ func newTagStore(subs []TagSubscription) *tagStore { } // Store tags extracted from TagSubscriptions -func (s *tagStore) insert(subscription TagSubscription, path *gnmiLib.Path, values map[string]interface{}, tags map[string]string) error { +func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values []updateField, tags map[string]string) error { switch subscription.Match { case "unconditional": - for k, v := range values { - tagName := subscription.Name + "/" + filepath.Base(k) - sv, err := internal.ToString(v) + for _, f := range values { + tagName := subscription.Name + if len(f.path.segments) > 0 { + key := f.path.segments[len(f.path.segments)-1] + key = strings.ReplaceAll(key, "-", "_") + tagName += "/" + key + } + sv, err := internal.ToString(f.value) if err != nil { - return fmt.Errorf("conversion error for %v: %w", v, err) + return fmt.Errorf("conversion error for %v: %w", f.value, err) } if sv == "" { delete(s.unconditional, tagName) @@ -68,11 +71,16 @@ func (s *tagStore) insert(subscription TagSubscription, path *gnmiLib.Path, valu } // Add the values - for k, v := range values { - tagName := subscription.Name + "/" + filepath.Base(k) - sv, err := internal.ToString(v) + for _, f := range values { + tagName := subscription.Name + if len(f.path.segments) > 0 { + key := f.path.segments[len(f.path.segments)-1] + key = strings.ReplaceAll(key, "-", "_") + tagName += "/" + key + } + sv, err := internal.ToString(f.value) if err != nil { - return fmt.Errorf("conversion error for %v: %w", v, err) + return fmt.Errorf("conversion error for %v: %w", f.value, err) } if sv == "" { delete(s.names[key], tagName) @@ -92,11 +100,16 @@ func (s *tagStore) insert(subscription TagSubscription, path *gnmiLib.Path, valu } // Add the values - for k, v := range values { - tagName := subscription.Name + "/" + filepath.Base(k) - sv, err := internal.ToString(v) + for _, f := range values { + tagName := subscription.Name + if len(f.path.segments) > 0 { + key := f.path.segments[len(f.path.segments)-1] + key = strings.ReplaceAll(key, "-", "_") + tagName += "/" + key + } + sv, err := internal.ToString(f.value) if err != nil { - return fmt.Errorf("conversion error for %v: %w", v, err) + return fmt.Errorf("conversion error for %v: %w", f.value, err) } if sv == "" { delete(s.elements.tags[key], tagName) @@ -111,7 +124,7 @@ func (s *tagStore) insert(subscription TagSubscription, path *gnmiLib.Path, valu return nil } -func (s *tagStore) lookup(path *gnmiLib.Path, metricTags map[string]string) map[string]string { +func (s *tagStore) lookup(path *pathInfo, metricTags map[string]string) map[string]string { // Add all unconditional tags tags := make(map[string]string, len(s.unconditional)) for k, v := range s.unconditional { @@ -140,9 +153,7 @@ func (s *tagStore) lookup(path *gnmiLib.Path, metricTags map[string]string) map[ return tags } -func (s *tagStore) getElementsKeys(path *gnmiLib.Path, elements []string) (string, bool) { - keyElements := pathKeys(path) - +func (s *tagStore) getElementsKeys(path *pathInfo, elements []string) (string, bool) { // Search for the required path elements and collect a ordered // list of their values to in the form // elementName1={keyA=valueA,keyB=valueB,...},...,elementNameN={keyY=valueY,keyZ=valueZ} @@ -151,9 +162,9 @@ func (s *tagStore) getElementsKeys(path *gnmiLib.Path, elements []string) (strin for _, requiredElement := range elements { var found bool var elementKVs []string - for _, el := range keyElements { - if el.Name == requiredElement { - for k, v := range el.Key { + for _, segment := range path.keyValues { + if segment.name == requiredElement { + for k, v := range segment.kv { elementKVs = append(elementKVs, k+"="+v) } found = true diff --git a/plugins/inputs/gnmi/testcases/issue_14044/expected.out b/plugins/inputs/gnmi/testcases/issue_14044/expected.out new file mode 100644 index 000000000..ecb759c67 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_14044/expected.out @@ -0,0 +1 @@ +ifdesc,name=FourHundredGigE0/2/0/3,path=openconfig-interfaces:/interfaces/interface/state,source=127.0.0.1 description="REDACTED" 1696324083211000000 \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_14044/responses.json b/plugins/inputs/gnmi/testcases/issue_14044/responses.json new file mode 100644 index 000000000..67558673a --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_14044/responses.json @@ -0,0 +1,33 @@ +[ + { + "update": { + "timestamp": "1696324083211000000", + "prefix": { + "origin": "openconfig-interfaces" + }, + "update": [ + { + "path": { + "elem": [ + { + "name": "interfaces" + }, + { + "name": "interface", + "key": { + "name": "FourHundredGigE0/2/0/3" + } + }, + { + "name": "state" + } + ] + }, + "val": { + "json_ietf_val": "eyJkZXNjcmlwdGlvbiI6IlJFREFDVEVEIn0=" + } + } + ] + } + } +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_14044/telegraf.conf b/plugins/inputs/gnmi/testcases/issue_14044/telegraf.conf new file mode 100644 index 000000000..903520d98 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_14044/telegraf.conf @@ -0,0 +1,12 @@ +[[inputs.gnmi]] + addresses = ["dummy"] + name_override = "gnmi" + redial = "10s" + encoding = "json_ietf" + guess_path_tag = true + [[inputs.gnmi.subscription]] + name = "ifdesc" + origin = "openconfig-interfaces" + path = '/interfaces/interface[name=FourHundredGigE*]/state/description' + subscription_mode = "sample" + sample_interval = "60s" diff --git a/plugins/inputs/gnmi/testcases/issue_14063/expected.out b/plugins/inputs/gnmi/testcases/issue_14063/expected.out new file mode 100644 index 000000000..911b2675d --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_14063/expected.out @@ -0,0 +1 @@ +ifcounters,path=oc-if:/interfaces/oc-if:interface/oc-if:state/oc-if:counters,source=127.0.0.1 in_1024_to_1518_octet_pkts=0u,in_128_to_255_octet_pkts=0u,in_1519_to_2047_octet_pkts=0u,in_2048_to_4095_octet_pkts=0u,in_256_to_511_octet_pkts=0u,in_4096_to_9216_octet_pkts=0u,in_512_to_1023_octet_pkts=0u,in_64_octet_pkts=0u,in_65_to_127_octet_pkts=0u,in_broadcast_pkts=0u,in_crc_error_pkts=0u,in_discards=0u,in_discards_octets=0u,in_dropped_octets=0u,in_dropped_pkts=0u,in_errors=0u,in_jabber_pkts=0u,in_multicast_pkts=0u,in_octets=0u,in_oversize_pkts=0u,in_pkts=0u,in_undersize_pkts=0u,in_unicast_pkts=0u,last_clear=1691859140059797458u,link_flap_events=0u,name="\\\"1\\",out_1519_to_2047_octet_pkts=0u,out_2048_to_4095_octet_pkts=0u,out_4096_to_9216_octet_pkts=0u,out_broadcast_pkts=0u,out_errors=0u,out_multicast_pkts=0u,out_octets=0u,out_pkts=0u,out_unicast_pkts=0u 1696617695101000000 diff --git a/plugins/inputs/gnmi/testcases/issue_14063/responses.json b/plugins/inputs/gnmi/testcases/issue_14063/responses.json new file mode 100644 index 000000000..f0784d196 --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_14063/responses.json @@ -0,0 +1,445 @@ +[ + { + "update": { + "timestamp": "1696617695101000000", + "prefix": { + "elem": [ + { + "name": "oc-if:interfaces" + }, + { + "name": "oc-if:interface" + }, + { + "name": "oc-if:state" + }, + { + "name": "oc-if:counters" + } + ] + }, + "update": [ + { + "path": { + "elem": [ + { + "name": "in-1024-to-1518-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-128-to-255-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-1519-to-2047-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-2048-to-4095-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-256-to-511-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-4096-to-9216-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-512-to-1023-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-64-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-65-to-127-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-broadcast-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-crc-error-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-discards" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-discards-octets" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-dropped-octets" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-dropped-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-errors" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-jabber-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-multicast-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-octets" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-oversize-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-undersize-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "in-unicast-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "last-clear" + } + ] + }, + "val": { + "uintVal": "1691859140059797458" + } + }, + { + "path": { + "elem": [ + { + "name": "link-flap-events" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "name" + } + ] + }, + "val": { + "stringVal": "\\\"1\\" + } + }, + { + "path": { + "elem": [ + { + "name": "out-1519-to-2047-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-2048-to-4095-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-4096-to-9216-octet-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-broadcast-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-errors" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-multicast-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-octets" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + }, + { + "path": { + "elem": [ + { + "name": "out-unicast-pkts" + } + ] + }, + "val": { + "uintVal": "0" + } + } + ] + } + } +] \ No newline at end of file diff --git a/plugins/inputs/gnmi/testcases/issue_14063/telegraf.conf b/plugins/inputs/gnmi/testcases/issue_14063/telegraf.conf new file mode 100644 index 000000000..5790be12b --- /dev/null +++ b/plugins/inputs/gnmi/testcases/issue_14063/telegraf.conf @@ -0,0 +1,11 @@ +[[inputs.gnmi]] + addresses = ["dummy"] + name_override = "gnmi" + redial = "10s" + + [[inputs.gnmi.subscription]] + name = "ifcounters" + origin = "openconfig-interfaces" + path = "/oc-if:interfaces/oc-if:interface/oc-if:state/oc-if:counters" + subscription_mode = "sample" + sample_interval = "30s" \ No newline at end of file diff --git a/plugins/inputs/gnmi/update_fields.go b/plugins/inputs/gnmi/update_fields.go new file mode 100644 index 000000000..36e35ac33 --- /dev/null +++ b/plugins/inputs/gnmi/update_fields.go @@ -0,0 +1,92 @@ +package gnmi + +import ( + "encoding/json" + "fmt" + "strconv" + + gnmiLib "github.com/openconfig/gnmi/proto/gnmi" + gnmiValue "github.com/openconfig/gnmi/value" +) + +type updateField struct { + path *pathInfo + value interface{} +} + +func newFieldsFromUpdate(path *pathInfo, update *gnmiLib.Update) ([]updateField, error) { + if update.Val == nil || update.Val.Value == nil { + return []updateField{{path: path}}, nil + } + + // Apply some special handling for special types + switch v := update.Val.Value.(type) { + case *gnmiLib.TypedValue_AsciiVal: // not handled in ToScalar + return []updateField{{path, v.AsciiVal}}, nil + case *gnmiLib.TypedValue_JsonVal: // requires special path handling + return processJSON(path, v.JsonVal) + case *gnmiLib.TypedValue_JsonIetfVal: // requires special path handling + return processJSON(path, v.JsonIetfVal) + } + + // Convert the protobuf "oneof" data to a Golang type. + value, err := gnmiValue.ToScalar(update.Val) + if err != nil { + return nil, err + } + return []updateField{{path, value}}, nil +} + +func processJSON(path *pathInfo, data []byte) ([]updateField, error) { + var nested interface{} + if err := json.Unmarshal(data, &nested); err != nil { + return nil, fmt.Errorf("failed to parse JSON value: %w", err) + } + + // Flatten the JSON data to get a key-value map + entries := flatten(nested) + + // Create an update-field with the complete path for all entries + fields := make([]updateField, 0, len(entries)) + for key, v := range entries { + fields = append(fields, updateField{ + path: path.appendSegments(key), + value: v, + }) + } + + return fields, nil +} + +func flatten(nested interface{}) map[string]interface{} { + fields := make(map[string]interface{}) + + switch n := nested.(type) { + case map[string]interface{}: + for k, child := range n { + for ck, cv := range flatten(child) { + key := k + if ck != "" { + key += "/" + ck + } + fields[key] = cv + } + } + case []interface{}: + for i, child := range n { + k := strconv.Itoa(i) + for ck, cv := range flatten(child) { + key := k + if ck != "" { + key += "/" + ck + } + fields[key] = cv + } + } + case nil: + return nil + default: + return map[string]interface{}{"": nested} + } + return fields +} diff --git a/plugins/inputs/gnmi/utils.go b/plugins/inputs/gnmi/utils.go deleted file mode 100644 index dd9e97f40..000000000 --- a/plugins/inputs/gnmi/utils.go +++ /dev/null @@ -1,154 +0,0 @@ -package gnmi - -import ( - "bytes" - "encoding/json" - "fmt" - "math" - "strings" - - gnmiLib "github.com/openconfig/gnmi/proto/gnmi" - - jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" -) - -// Parse path to path-buffer and tag-field -// -//nolint:revive //function-result-limit conditionally 4 return results allowed -func handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, aliases map[string]string, prefix string) (origin, path, alias string, err error) { - builder := bytes.NewBufferString(prefix) - - // Some devices do report the origin in the first path element - // so try to find out if this is the case. - if gnmiPath.Origin == "" && len(gnmiPath.Elem) > 0 { - groups := originPattern.FindStringSubmatch(gnmiPath.Elem[0].Name) - if len(groups) == 2 { - gnmiPath.Origin = groups[1] - gnmiPath.Elem[0].Name = gnmiPath.Elem[0].Name[len(groups[1])+1:] - } - } - - // Prefix with origin - if len(gnmiPath.Origin) > 0 { - origin = gnmiPath.Origin + ":" - } - - // Parse generic keys from prefix - for _, elem := range gnmiPath.Elem { - if len(elem.Name) > 0 { - if _, err := builder.WriteRune('/'); err != nil { - return "", "", "", err - } - if _, err := builder.WriteString(elem.Name); err != nil { - return "", "", "", err - } - } - name := builder.String() - - if _, exists := aliases[origin+name]; exists { - alias = origin + name - } else if _, exists := aliases[name]; exists { - alias = name - } - - if tags != nil { - for key, val := range elem.Key { - key = strings.ReplaceAll(key, "-", "_") - - // Use short-form of key if possible - if _, exists := tags[key]; exists { - tags[name+"/"+key] = val - } else { - tags[key] = val - } - } - } - } - - return origin, builder.String(), alias, nil -} - -// equalPathNoKeys checks if two gNMI paths are equal, without keys -func equalPathNoKeys(a *gnmiLib.Path, b *gnmiLib.Path) bool { - if len(a.Elem) != len(b.Elem) { - return false - } - for i := range a.Elem { - if a.Elem[i].Name != b.Elem[i].Name { - return false - } - } - return true -} - -func pathKeys(gpath *gnmiLib.Path) []*gnmiLib.PathElem { - var newPath []*gnmiLib.PathElem - for _, elem := range gpath.Elem { - if elem.Key != nil { - newPath = append(newPath, elem) - } - } - return newPath -} - -func pathWithPrefix(prefix *gnmiLib.Path, gpath *gnmiLib.Path) *gnmiLib.Path { - if prefix == nil { - return gpath - } - fullPath := new(gnmiLib.Path) - fullPath.Origin = prefix.Origin - fullPath.Target = prefix.Target - fullPath.Elem = append(prefix.Elem, gpath.Elem...) - return fullPath -} - -func gnmiToFields(name string, updateVal *gnmiLib.TypedValue) (map[string]interface{}, error) { - var value interface{} - var jsondata []byte - - // Make sure a value is actually set - if updateVal == nil || updateVal.Value == nil { - return nil, nil - } - - switch val := updateVal.Value.(type) { - case *gnmiLib.TypedValue_AsciiVal: - value = val.AsciiVal - case *gnmiLib.TypedValue_BoolVal: - value = val.BoolVal - case *gnmiLib.TypedValue_BytesVal: - value = val.BytesVal - case *gnmiLib.TypedValue_DoubleVal: - value = val.DoubleVal - case *gnmiLib.TypedValue_DecimalVal: - //nolint:staticcheck // to maintain backward compatibility with older gnmi specs - value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision)) - case *gnmiLib.TypedValue_FloatVal: - //nolint:staticcheck // to maintain backward compatibility with older gnmi specs - value = val.FloatVal - case *gnmiLib.TypedValue_IntVal: - value = val.IntVal - case *gnmiLib.TypedValue_StringVal: - value = val.StringVal - case *gnmiLib.TypedValue_UintVal: - value = val.UintVal - case *gnmiLib.TypedValue_JsonIetfVal: - jsondata = val.JsonIetfVal - case *gnmiLib.TypedValue_JsonVal: - jsondata = val.JsonVal - } - - fields := make(map[string]interface{}) - if value != nil { - fields[name] = value - } else if jsondata != nil { - if err := json.Unmarshal(jsondata, &value); err != nil { - return nil, fmt.Errorf("failed to parse JSON value: %w", err) - } - flattener := jsonparser.JSONFlattener{Fields: fields} - if err := flattener.FullFlattenJSON(name, value, true, true); err != nil { - return nil, fmt.Errorf("failed to flatten JSON: %w", err) - } - } - return fields, nil -}