From 607bfdbc978ec29188b4b00e7d922e57d73bc331 Mon Sep 17 00:00:00 2001 From: Severin Dellsperger Date: Wed, 25 Jan 2023 20:17:47 +0100 Subject: [PATCH] feat(inputs.cisco_telemetry_mdt): include delete field (#12345) --- go.mod | 2 +- go.sum | 4 +- plugins/inputs/cisco_telemetry_mdt/README.md | 3 + .../cisco_telemetry_mdt.go | 92 ++++++---- .../cisco_telemetry_mdt_test.go | 171 ++++++++++++++++++ .../inputs/cisco_telemetry_mdt/sample.conf | 3 + 6 files changed, 237 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index e8c28fcec..17fe819dd 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/blues/jsonata-go v1.5.4 github.com/bmatcuk/doublestar/v3 v3.0.0 github.com/caio/go-tdigest v3.1.0+incompatible - github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c + github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df github.com/coocood/freecache v1.2.2 github.com/coreos/go-semver v0.3.0 github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f diff --git a/go.sum b/go.sum index 83a6dc65f..0bc9548c1 100644 --- a/go.sum +++ b/go.sum @@ -558,8 +558,8 @@ github.com/cilium/ebpf v0.6.2/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJ github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= -github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c h1:k3y2XtIffIk230a+e0d7vbs5ebTvH3OcCMKN/jS6IAY= -github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c/go.mod h1:rJDd05J5hqWVU9MjJ+5jw1CuLn/jRhvU0xtFEzzqjwM= +github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df h1:GmrltUp5Qf5XhT+LmqMDizsgm/6VHTSxPWRdrq21yRo= +github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df/go.mod h1:rJDd05J5hqWVU9MjJ+5jw1CuLn/jRhvU0xtFEzzqjwM= github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= diff --git a/plugins/inputs/cisco_telemetry_mdt/README.md b/plugins/inputs/cisco_telemetry_mdt/README.md index 52ce7675e..28a9b15a5 100644 --- a/plugins/inputs/cisco_telemetry_mdt/README.md +++ b/plugins/inputs/cisco_telemetry_mdt/README.md @@ -47,6 +47,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Define (for certain nested telemetry measurements with embedded tags) which fields are tags # embedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"] + ## Include the delete field in every telemetry message. + # include_delete_field = false + ## Define aliases to map telemetry encoding paths to simple measurement names [inputs.cisco_telemetry_mdt.aliases] ifstats = "ietf-interfaces:interfaces-state/interface/statistics" diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go index 84733a802..5f97be5eb 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -51,13 +51,14 @@ type GRPCEnforcementPolicy struct { // CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms type CiscoTelemetryMDT struct { // Common configuration - Transport string - ServiceAddress string `toml:"service_address"` - MaxMsgSize int `toml:"max_msg_size"` - Aliases map[string]string `toml:"aliases"` - Dmes map[string]string `toml:"dmes"` - EmbeddedTags []string `toml:"embedded_tags"` - EnforcementPolicy GRPCEnforcementPolicy `toml:"grpc_enforcement_policy"` + Transport string `toml:"transport"` + ServiceAddress string `toml:"service_address"` + MaxMsgSize int `toml:"max_msg_size"` + Aliases map[string]string `toml:"aliases"` + Dmes map[string]string `toml:"dmes"` + EmbeddedTags []string `toml:"embedded_tags"` + EnforcementPolicy GRPCEnforcementPolicy `toml:"grpc_enforcement_policy"` + IncludeDeleteField bool `toml:"include_delete_field"` Log telegraf.Logger @@ -386,34 +387,51 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { } } - // if the keys and content fields are missing, skip the message as it - // does not have parsable data used by Telegraf - if keys == nil || content == nil { + if content == nil && !c.IncludeDeleteField { + c.Log.Debug("Message skipped because no content found and include of delete field not enabled") continue } + if keys != nil { + tags = make(map[string]string, len(keys.Fields)+3) + for _, subfield := range keys.Fields { + c.parseKeyField(tags, subfield, "") + } + } else { + tags = make(map[string]string, 3) + } // Parse keys - tags = make(map[string]string, len(keys.Fields)+3) tags["source"] = msg.GetNodeIdStr() if msgID := msg.GetSubscriptionIdStr(); msgID != "" { tags["subscription"] = msgID } - tags["path"] = msg.GetEncodingPath() + encodingPath := msg.GetEncodingPath() + tags["path"] = encodingPath - for _, subfield := range keys.Fields { - c.parseKeyField(tags, subfield, "") + if content != nil { + // Parse values + for _, subfield := range content.Fields { + prefix := "" + switch subfield.Name { + case "operation-metric": + prefix = subfield.Fields[0].Fields[0].GetStringValue() + case "class-stats": + prefix = subfield.Fields[0].Fields[1].GetStringValue() + } + c.parseContentField(grouper, subfield, prefix, encodingPath, tags, timestamp) + } + } + if c.IncludeDeleteField { + grouper.Add(c.getMeasurementName(encodingPath), tags, timestamp, "delete", gpbkv.GetDelete()) + } + + if content == nil { + continue } // Parse values for _, subfield := range content.Fields { - prefix := "" - switch subfield.Name { - case "operation-metric": - prefix = subfield.Fields[0].Fields[0].GetStringValue() - case "class-stats": - prefix = subfield.Fields[0].Fields[1].GetStringValue() - } - c.parseContentField(grouper, subfield, prefix, msg.EncodingPath, tags, timestamp) + c.parseContentField(grouper, subfield, "", encodingPath, tags, timestamp) } } @@ -560,6 +578,22 @@ func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGroup } } +func (c *CiscoTelemetryMDT) getMeasurementName(encodingPath string) string { + // Do alias lookup, to shorten measurement names + measurement := encodingPath + if alias, ok := c.internalAliases[encodingPath]; ok { + measurement = alias + } else { + c.mutex.Lock() + if _, haveWarned := c.warned[encodingPath]; !haveWarned { + c.Log.Debugf("No measurement alias for encoding path: %s", encodingPath) + c.warned[encodingPath] = struct{}{} + } + c.mutex.Unlock() + } + return measurement +} + func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, prefix string, encodingPath string, tags map[string]string, timestamp time.Time) { name := strings.ReplaceAll(field.Name, "-", "_") @@ -576,19 +610,7 @@ func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, fie extraTags := c.extraTags[strings.ReplaceAll(encodingPath, "-", "_")+"/"+name] if value := decodeValue(field); value != nil { - // Do alias lookup, to shorten measurement names - measurement := encodingPath - if alias, ok := c.internalAliases[encodingPath]; ok { - measurement = alias - } else { - c.mutex.Lock() - if _, haveWarned := c.warned[encodingPath]; !haveWarned { - c.Log.Debugf("No measurement alias for encoding path: %s", encodingPath) - c.warned[encodingPath] = struct{}{} - } - c.mutex.Unlock() - } - + measurement := c.getMeasurementName(encodingPath) if val := c.nxosValueXform(field, value, encodingPath); val != nil { grouper.Add(measurement, tags, timestamp, name, val) } else { diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go index 04a786dae..25fb2227b 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -7,6 +7,7 @@ import ( "io" "net" "testing" + "time" dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" telemetryBis "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" @@ -15,6 +16,8 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -96,6 +99,174 @@ func TestHandleTelemetryTwoSimple(t *testing.T) { acc.AssertContainsTaggedFields(t, "alias", fields, tags) } +func TestIncludeDeleteField(t *testing.T) { + type TelemetryEntry struct { + name string + fieldName string + uint32Value uint32 + uint64Value uint64 + stringValue string + } + encodingPath := TelemetryEntry{ + name: "path", + stringValue: "openconfig-interfaces:interfaces/interface/subinterfaces/subinterface/openconfig-if-ip:ipv6/addresses/address", + } + name := TelemetryEntry{name: "name", stringValue: "Loopback10"} + index := TelemetryEntry{name: "index", stringValue: "0"} + ip := TelemetryEntry{name: "ip", fieldName: "state/ip", stringValue: "10::10"} + prefixLength := TelemetryEntry{name: "prefix-length", fieldName: "state/prefix_length", uint32Value: uint32(128), uint64Value: 128} + origin := TelemetryEntry{name: "origin", fieldName: "state/origin", stringValue: "STATIC"} + status := TelemetryEntry{name: "status", fieldName: "state/status", stringValue: "PREFERRED"} + source := TelemetryEntry{name: "source", stringValue: "hostname"} + subscription := TelemetryEntry{name: "subscription", stringValue: "subscription"} + deleteKey := "delete" + stateKey := "state" + + testCases := []struct { + telemetry *telemetryBis.Telemetry + expected []telegraf.Metric + }{{ + telemetry: &telemetryBis.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: encodingPath.stringValue, + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: source.stringValue}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: subscription.stringValue}, + DataGpbkv: []*telemetryBis.TelemetryField{ + { + Fields: []*telemetryBis.TelemetryField{ + { + Name: "keys", + Fields: []*telemetryBis.TelemetryField{ + { + Name: name.name, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: name.stringValue}, + }, + { + Name: index.name, + ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: index.uint32Value}, + }, + { + Name: ip.name, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: ip.stringValue}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetryBis.TelemetryField{ + { + Name: stateKey, + Fields: []*telemetryBis.TelemetryField{ + { + Name: ip.name, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: ip.stringValue}, + }, + { + Name: prefixLength.name, + ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: prefixLength.uint32Value}, + }, + { + Name: origin.name, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: origin.stringValue}, + }, + { + Name: status.name, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: status.stringValue}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: []telegraf.Metric{ + metric.New( + "deleted", + map[string]string{ + encodingPath.name: encodingPath.stringValue, + name.name: name.stringValue, + index.name: index.stringValue, + ip.name: ip.stringValue, + source.name: source.stringValue, + subscription.name: subscription.stringValue, + }, + map[string]interface{}{ + deleteKey: false, + ip.fieldName: ip.stringValue, + prefixLength.fieldName: prefixLength.uint64Value, + origin.fieldName: origin.stringValue, + status.fieldName: status.stringValue, + }, + time.Now(), + )}, + }, + { + telemetry: &telemetryBis.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: encodingPath.stringValue, + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: source.stringValue}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: subscription.stringValue}, + DataGpbkv: []*telemetryBis.TelemetryField{ + { + Delete: true, + Fields: []*telemetryBis.TelemetryField{ + { + Name: "keys", + Fields: []*telemetryBis.TelemetryField{ + { + Name: name.name, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: name.stringValue}, + }, + { + Name: index.name, + ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: index.uint32Value}, + }, + { + Name: ip.name, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: ip.stringValue}, + }, + }, + }, + }, + }, + }, + }, + expected: []telegraf.Metric{ + metric.New( + "deleted", + map[string]string{ + encodingPath.name: encodingPath.stringValue, + name.name: name.stringValue, + index.name: index.stringValue, + ip.name: ip.stringValue, + source.name: source.stringValue, + subscription.name: subscription.stringValue, + }, + map[string]interface{}{deleteKey: true}, + time.Now(), + )}, + }, + } + for _, test := range testCases { + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{"deleted": encodingPath.stringValue}, + IncludeDeleteField: true} + acc := &testutil.Accumulator{} + // error is expected since we are passing in dummy transport + require.ErrorContains(t, c.Start(acc), "dummy") + data, err := proto.Marshal(test.telemetry) + require.NoError(t, err) + + c.handleTelemetry(data) + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, test.expected, actual, testutil.IgnoreTime()) + } +} + func TestHandleTelemetrySingleNested(t *testing.T) { c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nested": "type:model/nested/path"}} acc := &testutil.Accumulator{} diff --git a/plugins/inputs/cisco_telemetry_mdt/sample.conf b/plugins/inputs/cisco_telemetry_mdt/sample.conf index 689133aa2..64023b8b0 100644 --- a/plugins/inputs/cisco_telemetry_mdt/sample.conf +++ b/plugins/inputs/cisco_telemetry_mdt/sample.conf @@ -21,6 +21,9 @@ ## Define (for certain nested telemetry measurements with embedded tags) which fields are tags # embedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"] + ## Include the delete field in every telemetry message. + # include_delete_field = false + ## Define aliases to map telemetry encoding paths to simple measurement names [inputs.cisco_telemetry_mdt.aliases] ifstats = "ietf-interfaces:interfaces-state/interface/statistics"