feat(inputs.cisco_telemetry_mdt): Add microbust support (#13877)

This commit is contained in:
Shangxin Du 2023-09-07 06:27:12 -07:00 committed by GitHub
parent a528e842ef
commit 7df1e53a4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 287 additions and 34 deletions

View File

@ -158,4 +158,5 @@ multicast pim NXAPI show ip pim route vrf all
multicast pim NXAPI show ip pim rp vrf all
multicast pim NXAPI show ip pim statistics vrf all
multicast pim NXAPI show ip pim vrf all
microburst NATIVE path microburst
```

View File

@ -559,6 +559,48 @@ func (c *CiscoTelemetryMDT) parseRib(grouper *metric.SeriesGrouper, field *telem
}
}
func (c *CiscoTelemetryMDT) parseMicroburst(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField,
encodingPath string, tags map[string]string, timestamp time.Time) {
var nxMicro *telemetry.TelemetryField
var nxMicro1 *telemetry.TelemetryField
// Microburst
measurement := encodingPath
if len(field.Fields) > 3 {
nxMicro = field.Fields[2]
if len(nxMicro.Fields) > 0 {
nxMicro1 = nxMicro.Fields[0]
if len(nxMicro1.Fields) >= 3 {
nxMicro = nxMicro1.Fields[3]
}
}
}
for _, subfield := range nxMicro.Fields {
if subfield.Name == "interfaceName" {
tags[subfield.Name] = decodeTag(subfield)
}
for _, subf := range subfield.Fields {
switch subf.Name {
case "sourceName":
newstr := strings.Split(decodeTag(subf), "-[")
if len(newstr) <= 2 {
tags[subf.Name] = decodeTag(subf)
} else {
intfName := strings.Split(newstr[1], "]")
queue := strings.Split(newstr[2], "]")
tags["interface_name"] = intfName[0]
tags["queue_number"] = queue[0]
}
case "startTs":
tags[subf.Name] = decodeTag(subf)
}
if value := decodeValue(subf); value != nil {
grouper.Add(measurement, tags, timestamp, subf.Name, value)
}
}
}
}
func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField,
encodingPath string, tags map[string]string, timestamp time.Time) {
// DME structure: https://developer.cisco.com/site/nxapi-dme-model-reference-api/
@ -569,6 +611,11 @@ func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGroup
c.parseRib(grouper, field, encodingPath, tags, timestamp)
return
}
if encodingPath == "microburst" {
//dump microburst
c.parseMicroburst(grouper, field, encodingPath, tags, timestamp)
return
}
if field == nil || !isDme || len(field.Fields) == 0 || len(field.Fields[0].Fields) == 0 || len(field.Fields[0].Fields[0].Fields) == 0 {
return
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"net"
"os"
"testing"
"time"
@ -22,7 +23,13 @@ import (
)
func TestHandleTelemetryTwoSimple(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "dummy",
Aliases: map[string]string{
"alias": "type:model/some/path",
},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
// error is expected since we are passing in dummy transport
@ -90,11 +97,22 @@ func TestHandleTelemetryTwoSimple(t *testing.T) {
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
tags := map[string]string{"path": "type:model/some/path", "name": "str", "uint64": "1234", "source": "hostname", "subscription": "subscription"}
tags := map[string]string{
"path": "type:model/some/path",
"name": "str",
"uint64": "1234",
"source": "hostname",
"subscription": "subscription",
}
fields := map[string]interface{}{"bool": true}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
tags = map[string]string{"path": "type:model/some/path", "name": "str2", "source": "hostname", "subscription": "subscription"}
tags = map[string]string{
"path": "type:model/some/path",
"name": "str2",
"source": "hostname",
"subscription": "subscription",
}
fields = map[string]interface{}{"bool": false}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
}
@ -268,7 +286,13 @@ func TestIncludeDeleteField(t *testing.T) {
}
func TestHandleTelemetrySingleNested(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nested": "type:model/nested/path"}}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "dummy",
Aliases: map[string]string{
"nested": "type:model/nested/path",
},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
// error is expected since we are passing in dummy transport
@ -330,13 +354,22 @@ func TestHandleTelemetrySingleNested(t *testing.T) {
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
tags := map[string]string{"path": "type:model/nested/path", "level": "3", "source": "hostname", "subscription": "subscription"}
tags := map[string]string{
"path": "type:model/nested/path",
"level": "3",
"source": "hostname",
"subscription": "subscription",
}
fields := map[string]interface{}{"nested/value/foo": "bar"}
acc.AssertContainsTaggedFields(t, "nested", fields, tags)
}
func TestHandleEmbeddedTags(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"extra": "type:model/extra"}, EmbeddedTags: []string{"type:model/extra/list/name"}}
c := &CiscoTelemetryMDT{
Transport: "dummy",
Aliases: map[string]string{"extra": "type:model/extra"},
EmbeddedTags: []string{"type:model/extra/list/name"},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
// error is expected since we are passing in dummy transport
@ -400,16 +433,31 @@ func TestHandleEmbeddedTags(t *testing.T) {
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
tags1 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry1"}
tags1 := map[string]string{
"path": "type:model/extra",
"foo": "bar",
"source": "hostname",
"subscription": "subscription",
"list/name": "entry1",
}
fields1 := map[string]interface{}{"list/test": "foo"}
tags2 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry2"}
tags2 := map[string]string{
"path": "type:model/extra",
"foo": "bar",
"source": "hostname",
"subscription": "subscription",
"list/name": "entry2",
}
fields2 := map[string]interface{}{"list/test": "bar"}
acc.AssertContainsTaggedFields(t, "extra", fields1, tags1)
acc.AssertContainsTaggedFields(t, "extra", fields2, tags2)
}
func TestHandleNXAPI(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}}
c := &CiscoTelemetryMDT{
Transport: "dummy",
Aliases: map[string]string{"nxapi": "show nxapi"},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
// error is expected since we are passing in dummy transport
@ -489,16 +537,34 @@ func TestHandleNXAPI(t *testing.T) {
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
tags1 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i1", "row_number": "0", "source": "hostname", "subscription": "subscription"}
tags1 := map[string]string{
"path": "show nxapi",
"foo": "bar",
"TABLE_nxapi": "i1",
"row_number": "0",
"source": "hostname",
"subscription": "subscription",
}
fields1 := map[string]interface{}{"value": "foo"}
tags2 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i2", "row_number": "0", "source": "hostname", "subscription": "subscription"}
tags2 := map[string]string{
"path": "show nxapi",
"foo": "bar",
"TABLE_nxapi": "i2",
"row_number": "0",
"source": "hostname",
"subscription": "subscription",
}
fields2 := map[string]interface{}{"value": "bar"}
acc.AssertContainsTaggedFields(t, "nxapi", fields1, tags1)
acc.AssertContainsTaggedFields(t, "nxapi", fields2, tags2)
}
func TestHandleNXAPIXformNXAPI(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "dummy",
Aliases: map[string]string{"nxapi": "show nxapi"},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
// error is expected since we are passing in dummy transport
@ -579,7 +645,10 @@ func TestHandleNXAPIXformNXAPI(t *testing.T) {
}
func TestHandleNXXformMulti(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/lldp"}}
c := &CiscoTelemetryMDT{
Transport: "dummy",
Aliases: map[string]string{"dme": "sys/lldp"},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
// error is expected since we are passing in dummy transport
@ -659,12 +728,20 @@ func TestHandleNXXformMulti(t *testing.T) {
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
//validate various transformation scenaarios newly added in the code.
fields := map[string]interface{}{"portIdV": "12", "portDesc": "100", "test": int64(281474976710655), "subscriptionId": "2814749767106551"}
fields := map[string]interface{}{
"portIdV": "12",
"portDesc": "100",
"test": int64(281474976710655),
"subscriptionId": "2814749767106551",
}
acc.AssertContainsFields(t, "dme", fields)
}
func TestHandleNXDME(t *testing.T) {
c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/dme"}}
c := &CiscoTelemetryMDT{
Transport: "dummy",
Aliases: map[string]string{"dme": "sys/dme"},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
// error is expected since we are passing in dummy transport
@ -732,13 +809,23 @@ func TestHandleNXDME(t *testing.T) {
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
tags1 := map[string]string{"path": "sys/dme", "foo": "bar", "fooEntity": "some-rn", "source": "hostname", "subscription": "subscription"}
tags1 := map[string]string{
"path": "sys/dme",
"foo": "bar",
"fooEntity": "some-rn",
"source": "hostname",
"subscription": "subscription",
}
fields1 := map[string]interface{}{"value": "foo"}
acc.AssertContainsTaggedFields(t, "dme", fields1, tags1)
}
func TestTCPDialoutOverflow(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "tcp", ServiceAddress: "127.0.0.1:0"}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "tcp",
ServiceAddress: "127.0.0.1:0",
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
require.NoError(t, err)
@ -764,6 +851,20 @@ func TestTCPDialoutOverflow(t *testing.T) {
require.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000"))
}
func mockTelemetryMicroburstMessage() *telemetryBis.Telemetry {
data, err := os.ReadFile("./testdata/microburst")
if err != nil {
panic(err)
}
newMessage := &telemetryBis.Telemetry{}
err = proto.Unmarshal(data, newMessage)
if err != nil {
panic(err)
}
return newMessage
}
func mockTelemetryMessage() *telemetryBis.Telemetry {
return &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000,
@ -797,9 +898,57 @@ func mockTelemetryMessage() *telemetryBis.Telemetry {
}
}
func TestGRPCDialoutMicroburst(t *testing.T) {
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "grpc",
ServiceAddress: "127.0.0.1:0",
Aliases: map[string]string{
"some": "microburst",
"parallel": "type:model/parallel/path",
"other": "type:model/other/path",
},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
require.NoError(t, err)
telemetry := mockTelemetryMicroburstMessage()
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
tags := map[string]string{
"microburst": "microburst",
"path": "microburst",
"source": "n9k-eor-tm4",
"subscription": "1",
}
fields := map[string]interface{}{
"duration": uint64(1200),
"endDepth": int64(0),
"interfaceName": "Eth0/0/0",
"peak": int64(500),
"queue": "queue-255",
"queueType": "unicast",
"threshold": int64(0),
"ts": "2023-08-03T20:12:59.655308Z",
}
acc.AssertContainsTaggedFields(t, "microburst", fields, tags)
}
func TestTCPDialoutMultiple(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "tcp", ServiceAddress: "127.0.0.1:0", Aliases: map[string]string{
"some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "tcp",
ServiceAddress: "127.0.0.1:0",
Aliases: map[string]string{
"some": "type:model/some/path",
"parallel": "type:model/parallel/path",
"other": "type:model/other/path",
},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
require.NoError(t, err)
@ -858,21 +1007,40 @@ func TestTCPDialoutMultiple(t *testing.T) {
// We use the invalid dialout flags to let the server close the connection
require.Equal(t, acc.Errors, []error{errors.New("invalid dialout flags: 257"), errors.New("invalid dialout flags: 257")})
tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"}
tags := map[string]string{
"path": "type:model/some/path",
"name": "str",
"source": "hostname",
"subscription": "subscription",
}
fields := map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "some", fields, tags)
tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"}
tags = map[string]string{
"path": "type:model/parallel/path",
"name": "str",
"source": "hostname",
"subscription": "subscription",
}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "parallel", fields, tags)
tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"}
tags = map[string]string{
"path": "type:model/other/path",
"name": "str",
"source": "hostname",
"subscription": "subscription",
}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "other", fields, tags)
}
func TestGRPCDialoutError(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0"}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "grpc",
ServiceAddress: "127.0.0.1:0",
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
require.NoError(t, err)
@ -896,8 +1064,16 @@ func TestGRPCDialoutError(t *testing.T) {
}
func TestGRPCDialoutMultiple(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", Aliases: map[string]string{
"some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "grpc",
ServiceAddress: "127.0.0.1:0",
Aliases: map[string]string{
"some": "type:model/some/path",
"parallel": "type:model/parallel/path",
"other": "type:model/other/path",
},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
require.NoError(t, err)
@ -945,24 +1121,44 @@ func TestGRPCDialoutMultiple(t *testing.T) {
require.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: testclose"), errors.New("GRPC dialout error: testclose")})
tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"}
tags := map[string]string{
"path": "type:model/some/path",
"name": "str",
"source": "hostname",
"subscription": "subscription",
}
fields := map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "some", fields, tags)
tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"}
tags = map[string]string{
"path": "type:model/parallel/path",
"name": "str",
"source": "hostname",
"subscription": "subscription",
}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "parallel", fields, tags)
tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"}
tags = map[string]string{
"path": "type:model/other/path",
"name": "str",
"source": "hostname",
"subscription": "subscription",
}
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "other", fields, tags)
}
func TestGRPCDialoutKeepalive(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", EnforcementPolicy: GRPCEnforcementPolicy{
PermitKeepaliveWithoutCalls: true,
KeepaliveMinTime: 0,
}}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "grpc",
ServiceAddress: "127.0.0.1:0",
EnforcementPolicy: GRPCEnforcementPolicy{
PermitKeepaliveWithoutCalls: true,
KeepaliveMinTime: 0,
},
}
acc := &testutil.Accumulator{}
err := c.Start(acc)
require.NoError(t, err)
@ -985,7 +1181,11 @@ func TestGRPCDialoutKeepalive(t *testing.T) {
}
func TestSourceFieldRewrite(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}}
c := &CiscoTelemetryMDT{
Log: testutil.Logger{},
Transport: "dummy",
Aliases: map[string]string{"alias": "type:model/some/path"},
}
c.SourceFieldName = "mdt_source"
acc := &testutil.Accumulator{}
err := c.Start(acc)
@ -1028,7 +1228,12 @@ func TestSourceFieldRewrite(t *testing.T) {
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
tags := map[string]string{"path": "type:model/some/path", "mdt_source": "str", "source": "hostname", "subscription": "subscription"}
tags := map[string]string{
"path": "type:model/some/path",
"mdt_source": "str",
"source": "hostname",
"subscription": "subscription",
}
fields := map[string]interface{}{"bool": false}
acc.AssertContainsTaggedFields(t, "alias", fields, tags)
}

Binary file not shown.