feat(inputs.cisco_telemetry_mdt): include delete field (#12345)
This commit is contained in:
parent
dc89a0401c
commit
607bfdbc97
2
go.mod
2
go.mod
|
|
@ -51,7 +51,7 @@ require (
|
|||
github.com/blues/jsonata-go v1.5.4
|
||||
github.com/bmatcuk/doublestar/v3 v3.0.0
|
||||
github.com/caio/go-tdigest v3.1.0+incompatible
|
||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c
|
||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df
|
||||
github.com/coocood/freecache v1.2.2
|
||||
github.com/coreos/go-semver v0.3.0
|
||||
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -558,8 +558,8 @@ github.com/cilium/ebpf v0.6.2/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJ
|
|||
github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA=
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
|
||||
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
|
||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c h1:k3y2XtIffIk230a+e0d7vbs5ebTvH3OcCMKN/jS6IAY=
|
||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c/go.mod h1:rJDd05J5hqWVU9MjJ+5jw1CuLn/jRhvU0xtFEzzqjwM=
|
||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df h1:GmrltUp5Qf5XhT+LmqMDizsgm/6VHTSxPWRdrq21yRo=
|
||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df/go.mod h1:rJDd05J5hqWVU9MjJ+5jw1CuLn/jRhvU0xtFEzzqjwM=
|
||||
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=
|
||||
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
|
|
|
|||
|
|
@ -47,6 +47,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
## Define (for certain nested telemetry measurements with embedded tags) which fields are tags
|
||||
# embedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"]
|
||||
|
||||
## Include the delete field in every telemetry message.
|
||||
# include_delete_field = false
|
||||
|
||||
## Define aliases to map telemetry encoding paths to simple measurement names
|
||||
[inputs.cisco_telemetry_mdt.aliases]
|
||||
ifstats = "ietf-interfaces:interfaces-state/interface/statistics"
|
||||
|
|
|
|||
|
|
@ -51,13 +51,14 @@ type GRPCEnforcementPolicy struct {
|
|||
// CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms
|
||||
type CiscoTelemetryMDT struct {
|
||||
// Common configuration
|
||||
Transport string
|
||||
Transport string `toml:"transport"`
|
||||
ServiceAddress string `toml:"service_address"`
|
||||
MaxMsgSize int `toml:"max_msg_size"`
|
||||
Aliases map[string]string `toml:"aliases"`
|
||||
Dmes map[string]string `toml:"dmes"`
|
||||
EmbeddedTags []string `toml:"embedded_tags"`
|
||||
EnforcementPolicy GRPCEnforcementPolicy `toml:"grpc_enforcement_policy"`
|
||||
IncludeDeleteField bool `toml:"include_delete_field"`
|
||||
|
||||
Log telegraf.Logger
|
||||
|
||||
|
|
@ -386,24 +387,28 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
// if the keys and content fields are missing, skip the message as it
|
||||
// does not have parsable data used by Telegraf
|
||||
if keys == nil || content == nil {
|
||||
if content == nil && !c.IncludeDeleteField {
|
||||
c.Log.Debug("Message skipped because no content found and include of delete field not enabled")
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse keys
|
||||
if keys != nil {
|
||||
tags = make(map[string]string, len(keys.Fields)+3)
|
||||
for _, subfield := range keys.Fields {
|
||||
c.parseKeyField(tags, subfield, "")
|
||||
}
|
||||
} else {
|
||||
tags = make(map[string]string, 3)
|
||||
}
|
||||
// Parse keys
|
||||
tags["source"] = msg.GetNodeIdStr()
|
||||
if msgID := msg.GetSubscriptionIdStr(); msgID != "" {
|
||||
tags["subscription"] = msgID
|
||||
}
|
||||
tags["path"] = msg.GetEncodingPath()
|
||||
|
||||
for _, subfield := range keys.Fields {
|
||||
c.parseKeyField(tags, subfield, "")
|
||||
}
|
||||
encodingPath := msg.GetEncodingPath()
|
||||
tags["path"] = encodingPath
|
||||
|
||||
if content != nil {
|
||||
// Parse values
|
||||
for _, subfield := range content.Fields {
|
||||
prefix := ""
|
||||
|
|
@ -413,7 +418,20 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
|
|||
case "class-stats":
|
||||
prefix = subfield.Fields[0].Fields[1].GetStringValue()
|
||||
}
|
||||
c.parseContentField(grouper, subfield, prefix, msg.EncodingPath, tags, timestamp)
|
||||
c.parseContentField(grouper, subfield, prefix, encodingPath, tags, timestamp)
|
||||
}
|
||||
}
|
||||
if c.IncludeDeleteField {
|
||||
grouper.Add(c.getMeasurementName(encodingPath), tags, timestamp, "delete", gpbkv.GetDelete())
|
||||
}
|
||||
|
||||
if content == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse values
|
||||
for _, subfield := range content.Fields {
|
||||
c.parseContentField(grouper, subfield, "", encodingPath, tags, timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -560,6 +578,22 @@ func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGroup
|
|||
}
|
||||
}
|
||||
|
||||
func (c *CiscoTelemetryMDT) getMeasurementName(encodingPath string) string {
|
||||
// Do alias lookup, to shorten measurement names
|
||||
measurement := encodingPath
|
||||
if alias, ok := c.internalAliases[encodingPath]; ok {
|
||||
measurement = alias
|
||||
} else {
|
||||
c.mutex.Lock()
|
||||
if _, haveWarned := c.warned[encodingPath]; !haveWarned {
|
||||
c.Log.Debugf("No measurement alias for encoding path: %s", encodingPath)
|
||||
c.warned[encodingPath] = struct{}{}
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
}
|
||||
return measurement
|
||||
}
|
||||
|
||||
func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, prefix string,
|
||||
encodingPath string, tags map[string]string, timestamp time.Time) {
|
||||
name := strings.ReplaceAll(field.Name, "-", "_")
|
||||
|
|
@ -576,19 +610,7 @@ func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, fie
|
|||
extraTags := c.extraTags[strings.ReplaceAll(encodingPath, "-", "_")+"/"+name]
|
||||
|
||||
if value := decodeValue(field); value != nil {
|
||||
// Do alias lookup, to shorten measurement names
|
||||
measurement := encodingPath
|
||||
if alias, ok := c.internalAliases[encodingPath]; ok {
|
||||
measurement = alias
|
||||
} else {
|
||||
c.mutex.Lock()
|
||||
if _, haveWarned := c.warned[encodingPath]; !haveWarned {
|
||||
c.Log.Debugf("No measurement alias for encoding path: %s", encodingPath)
|
||||
c.warned[encodingPath] = struct{}{}
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
}
|
||||
|
||||
measurement := c.getMeasurementName(encodingPath)
|
||||
if val := c.nxosValueXform(field, value, encodingPath); val != nil {
|
||||
grouper.Add(measurement, tags, timestamp, name, val)
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout"
|
||||
telemetryBis "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
|
||||
|
|
@ -15,6 +16,8 @@ import (
|
|||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
|
|
@ -96,6 +99,174 @@ func TestHandleTelemetryTwoSimple(t *testing.T) {
|
|||
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
|
||||
}
|
||||
|
||||
func TestIncludeDeleteField(t *testing.T) {
|
||||
type TelemetryEntry struct {
|
||||
name string
|
||||
fieldName string
|
||||
uint32Value uint32
|
||||
uint64Value uint64
|
||||
stringValue string
|
||||
}
|
||||
encodingPath := TelemetryEntry{
|
||||
name: "path",
|
||||
stringValue: "openconfig-interfaces:interfaces/interface/subinterfaces/subinterface/openconfig-if-ip:ipv6/addresses/address",
|
||||
}
|
||||
name := TelemetryEntry{name: "name", stringValue: "Loopback10"}
|
||||
index := TelemetryEntry{name: "index", stringValue: "0"}
|
||||
ip := TelemetryEntry{name: "ip", fieldName: "state/ip", stringValue: "10::10"}
|
||||
prefixLength := TelemetryEntry{name: "prefix-length", fieldName: "state/prefix_length", uint32Value: uint32(128), uint64Value: 128}
|
||||
origin := TelemetryEntry{name: "origin", fieldName: "state/origin", stringValue: "STATIC"}
|
||||
status := TelemetryEntry{name: "status", fieldName: "state/status", stringValue: "PREFERRED"}
|
||||
source := TelemetryEntry{name: "source", stringValue: "hostname"}
|
||||
subscription := TelemetryEntry{name: "subscription", stringValue: "subscription"}
|
||||
deleteKey := "delete"
|
||||
stateKey := "state"
|
||||
|
||||
testCases := []struct {
|
||||
telemetry *telemetryBis.Telemetry
|
||||
expected []telegraf.Metric
|
||||
}{{
|
||||
telemetry: &telemetryBis.Telemetry{
|
||||
MsgTimestamp: 1543236572000,
|
||||
EncodingPath: encodingPath.stringValue,
|
||||
NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: source.stringValue},
|
||||
Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: subscription.stringValue},
|
||||
DataGpbkv: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Fields: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Name: "keys",
|
||||
Fields: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Name: name.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: name.stringValue},
|
||||
},
|
||||
{
|
||||
Name: index.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: index.uint32Value},
|
||||
},
|
||||
{
|
||||
Name: ip.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: ip.stringValue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "content",
|
||||
Fields: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Name: stateKey,
|
||||
Fields: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Name: ip.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: ip.stringValue},
|
||||
},
|
||||
{
|
||||
Name: prefixLength.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: prefixLength.uint32Value},
|
||||
},
|
||||
{
|
||||
Name: origin.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: origin.stringValue},
|
||||
},
|
||||
{
|
||||
Name: status.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: status.stringValue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: []telegraf.Metric{
|
||||
metric.New(
|
||||
"deleted",
|
||||
map[string]string{
|
||||
encodingPath.name: encodingPath.stringValue,
|
||||
name.name: name.stringValue,
|
||||
index.name: index.stringValue,
|
||||
ip.name: ip.stringValue,
|
||||
source.name: source.stringValue,
|
||||
subscription.name: subscription.stringValue,
|
||||
},
|
||||
map[string]interface{}{
|
||||
deleteKey: false,
|
||||
ip.fieldName: ip.stringValue,
|
||||
prefixLength.fieldName: prefixLength.uint64Value,
|
||||
origin.fieldName: origin.stringValue,
|
||||
status.fieldName: status.stringValue,
|
||||
},
|
||||
time.Now(),
|
||||
)},
|
||||
},
|
||||
{
|
||||
telemetry: &telemetryBis.Telemetry{
|
||||
MsgTimestamp: 1543236572000,
|
||||
EncodingPath: encodingPath.stringValue,
|
||||
NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: source.stringValue},
|
||||
Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: subscription.stringValue},
|
||||
DataGpbkv: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Delete: true,
|
||||
Fields: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Name: "keys",
|
||||
Fields: []*telemetryBis.TelemetryField{
|
||||
{
|
||||
Name: name.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: name.stringValue},
|
||||
},
|
||||
{
|
||||
Name: index.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: index.uint32Value},
|
||||
},
|
||||
{
|
||||
Name: ip.name,
|
||||
ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: ip.stringValue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: []telegraf.Metric{
|
||||
metric.New(
|
||||
"deleted",
|
||||
map[string]string{
|
||||
encodingPath.name: encodingPath.stringValue,
|
||||
name.name: name.stringValue,
|
||||
index.name: index.stringValue,
|
||||
ip.name: ip.stringValue,
|
||||
source.name: source.stringValue,
|
||||
subscription.name: subscription.stringValue,
|
||||
},
|
||||
map[string]interface{}{deleteKey: true},
|
||||
time.Now(),
|
||||
)},
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
c := &CiscoTelemetryMDT{
|
||||
Log: testutil.Logger{},
|
||||
Transport: "dummy",
|
||||
Aliases: map[string]string{"deleted": encodingPath.stringValue},
|
||||
IncludeDeleteField: true}
|
||||
acc := &testutil.Accumulator{}
|
||||
// error is expected since we are passing in dummy transport
|
||||
require.ErrorContains(t, c.Start(acc), "dummy")
|
||||
data, err := proto.Marshal(test.telemetry)
|
||||
require.NoError(t, err)
|
||||
|
||||
c.handleTelemetry(data)
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, test.expected, actual, testutil.IgnoreTime())
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleTelemetrySingleNested(t *testing.T) {
|
||||
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nested": "type:model/nested/path"}}
|
||||
acc := &testutil.Accumulator{}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,9 @@
|
|||
## Define (for certain nested telemetry measurements with embedded tags) which fields are tags
|
||||
# embedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"]
|
||||
|
||||
## Include the delete field in every telemetry message.
|
||||
# include_delete_field = false
|
||||
|
||||
## Define aliases to map telemetry encoding paths to simple measurement names
|
||||
[inputs.cisco_telemetry_mdt.aliases]
|
||||
ifstats = "ietf-interfaces:interfaces-state/interface/statistics"
|
||||
|
|
|
|||
Loading…
Reference in New Issue