From 7df1e53a4b20c3f0b6253df685315ae1e496e6b8 Mon Sep 17 00:00:00 2001 From: Shangxin Du Date: Thu, 7 Sep 2023 06:27:12 -0700 Subject: [PATCH] feat(inputs.cisco_telemetry_mdt): Add microbust support (#13877) --- plugins/inputs/cisco_telemetry_mdt/README.md | 1 + .../cisco_telemetry_mdt.go | 47 +++ .../cisco_telemetry_mdt_test.go | 273 +++++++++++++++--- .../cisco_telemetry_mdt/testdata/microburst | Bin 0 -> 5804 bytes 4 files changed, 287 insertions(+), 34 deletions(-) create mode 100644 plugins/inputs/cisco_telemetry_mdt/testdata/microburst diff --git a/plugins/inputs/cisco_telemetry_mdt/README.md b/plugins/inputs/cisco_telemetry_mdt/README.md index 8cc5439ff..8ceadfeb3 100644 --- a/plugins/inputs/cisco_telemetry_mdt/README.md +++ b/plugins/inputs/cisco_telemetry_mdt/README.md @@ -158,4 +158,5 @@ multicast pim NXAPI show ip pim route vrf all multicast pim NXAPI show ip pim rp vrf all multicast pim NXAPI show ip pim statistics vrf all multicast pim NXAPI show ip pim vrf all +microburst NATIVE path microburst ``` diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go index 61e9a7fed..20cd1019a 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -559,6 +559,48 @@ func (c *CiscoTelemetryMDT) parseRib(grouper *metric.SeriesGrouper, field *telem } } +func (c *CiscoTelemetryMDT) parseMicroburst(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, + encodingPath string, tags map[string]string, timestamp time.Time) { + var nxMicro *telemetry.TelemetryField + var nxMicro1 *telemetry.TelemetryField + // Microburst + measurement := encodingPath + if len(field.Fields) > 3 { + nxMicro = field.Fields[2] + if len(nxMicro.Fields) > 0 { + nxMicro1 = nxMicro.Fields[0] + if len(nxMicro1.Fields) >= 3 { + nxMicro = nxMicro1.Fields[3] + } + } + } + for _, subfield := range nxMicro.Fields { + if subfield.Name == "interfaceName" { + tags[subfield.Name] = decodeTag(subfield) + } + + for _, subf := range subfield.Fields { + switch subf.Name { + case "sourceName": + newstr := strings.Split(decodeTag(subf), "-[") + if len(newstr) <= 2 { + tags[subf.Name] = decodeTag(subf) + } else { + intfName := strings.Split(newstr[1], "]") + queue := strings.Split(newstr[2], "]") + tags["interface_name"] = intfName[0] + tags["queue_number"] = queue[0] + } + case "startTs": + tags[subf.Name] = decodeTag(subf) + } + if value := decodeValue(subf); value != nil { + grouper.Add(measurement, tags, timestamp, subf.Name, value) + } + } + } +} + func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, encodingPath string, tags map[string]string, timestamp time.Time) { // DME structure: https://developer.cisco.com/site/nxapi-dme-model-reference-api/ @@ -569,6 +611,11 @@ func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGroup c.parseRib(grouper, field, encodingPath, tags, timestamp) return } + if encodingPath == "microburst" { + //dump microburst + c.parseMicroburst(grouper, field, encodingPath, tags, timestamp) + return + } if field == nil || !isDme || len(field.Fields) == 0 || len(field.Fields[0].Fields) == 0 || len(field.Fields[0].Fields[0].Fields) == 0 { return } diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go index 880330ed7..b0ff020d9 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -6,6 +6,7 @@ import ( "errors" "io" "net" + "os" "testing" "time" @@ -22,7 +23,13 @@ import ( ) func TestHandleTelemetryTwoSimple(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{ + "alias": "type:model/some/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -90,11 +97,22 @@ func TestHandleTelemetryTwoSimple(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags := map[string]string{"path": "type:model/some/path", "name": "str", "uint64": "1234", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "name": "str", + "uint64": "1234", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"bool": true} acc.AssertContainsTaggedFields(t, "alias", fields, tags) - tags = map[string]string{"path": "type:model/some/path", "name": "str2", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/some/path", + "name": "str2", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"bool": false} acc.AssertContainsTaggedFields(t, "alias", fields, tags) } @@ -268,7 +286,13 @@ func TestIncludeDeleteField(t *testing.T) { } func TestHandleTelemetrySingleNested(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nested": "type:model/nested/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{ + "nested": "type:model/nested/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -330,13 +354,22 @@ func TestHandleTelemetrySingleNested(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags := map[string]string{"path": "type:model/nested/path", "level": "3", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/nested/path", + "level": "3", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"nested/value/foo": "bar"} acc.AssertContainsTaggedFields(t, "nested", fields, tags) } func TestHandleEmbeddedTags(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"extra": "type:model/extra"}, EmbeddedTags: []string{"type:model/extra/list/name"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"extra": "type:model/extra"}, + EmbeddedTags: []string{"type:model/extra/list/name"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -400,16 +433,31 @@ func TestHandleEmbeddedTags(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags1 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry1"} + tags1 := map[string]string{ + "path": "type:model/extra", + "foo": "bar", + "source": "hostname", + "subscription": "subscription", + "list/name": "entry1", + } fields1 := map[string]interface{}{"list/test": "foo"} - tags2 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry2"} + tags2 := map[string]string{ + "path": "type:model/extra", + "foo": "bar", + "source": "hostname", + "subscription": "subscription", + "list/name": "entry2", + } fields2 := map[string]interface{}{"list/test": "bar"} acc.AssertContainsTaggedFields(t, "extra", fields1, tags1) acc.AssertContainsTaggedFields(t, "extra", fields2, tags2) } func TestHandleNXAPI(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"nxapi": "show nxapi"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -489,16 +537,34 @@ func TestHandleNXAPI(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags1 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i1", "row_number": "0", "source": "hostname", "subscription": "subscription"} + tags1 := map[string]string{ + "path": "show nxapi", + "foo": "bar", + "TABLE_nxapi": "i1", + "row_number": "0", + "source": "hostname", + "subscription": "subscription", + } fields1 := map[string]interface{}{"value": "foo"} - tags2 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i2", "row_number": "0", "source": "hostname", "subscription": "subscription"} + tags2 := map[string]string{ + "path": "show nxapi", + "foo": "bar", + "TABLE_nxapi": "i2", + "row_number": "0", + "source": "hostname", + "subscription": "subscription", + } fields2 := map[string]interface{}{"value": "bar"} acc.AssertContainsTaggedFields(t, "nxapi", fields1, tags1) acc.AssertContainsTaggedFields(t, "nxapi", fields2, tags2) } func TestHandleNXAPIXformNXAPI(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{"nxapi": "show nxapi"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -579,7 +645,10 @@ func TestHandleNXAPIXformNXAPI(t *testing.T) { } func TestHandleNXXformMulti(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/lldp"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"dme": "sys/lldp"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -659,12 +728,20 @@ func TestHandleNXXformMulti(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) //validate various transformation scenaarios newly added in the code. - fields := map[string]interface{}{"portIdV": "12", "portDesc": "100", "test": int64(281474976710655), "subscriptionId": "2814749767106551"} + fields := map[string]interface{}{ + "portIdV": "12", + "portDesc": "100", + "test": int64(281474976710655), + "subscriptionId": "2814749767106551", + } acc.AssertContainsFields(t, "dme", fields) } func TestHandleNXDME(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/dme"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"dme": "sys/dme"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -732,13 +809,23 @@ func TestHandleNXDME(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags1 := map[string]string{"path": "sys/dme", "foo": "bar", "fooEntity": "some-rn", "source": "hostname", "subscription": "subscription"} + tags1 := map[string]string{ + "path": "sys/dme", + "foo": "bar", + "fooEntity": "some-rn", + "source": "hostname", + "subscription": "subscription", + } fields1 := map[string]interface{}{"value": "foo"} acc.AssertContainsTaggedFields(t, "dme", fields1, tags1) } func TestTCPDialoutOverflow(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "tcp", ServiceAddress: "127.0.0.1:0"} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "tcp", + ServiceAddress: "127.0.0.1:0", + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -764,6 +851,20 @@ func TestTCPDialoutOverflow(t *testing.T) { require.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000")) } +func mockTelemetryMicroburstMessage() *telemetryBis.Telemetry { + data, err := os.ReadFile("./testdata/microburst") + if err != nil { + panic(err) + } + + newMessage := &telemetryBis.Telemetry{} + err = proto.Unmarshal(data, newMessage) + if err != nil { + panic(err) + } + return newMessage +} + func mockTelemetryMessage() *telemetryBis.Telemetry { return &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, @@ -797,9 +898,57 @@ func mockTelemetryMessage() *telemetryBis.Telemetry { } } +func TestGRPCDialoutMicroburst(t *testing.T) { + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + Aliases: map[string]string{ + "some": "microburst", + "parallel": "type:model/parallel/path", + "other": "type:model/other/path", + }, + } + acc := &testutil.Accumulator{} + err := c.Start(acc) + require.NoError(t, err) + + telemetry := mockTelemetryMicroburstMessage() + data, err := proto.Marshal(telemetry) + require.NoError(t, err) + + c.handleTelemetry(data) + require.Empty(t, acc.Errors) + tags := map[string]string{ + "microburst": "microburst", + "path": "microburst", + "source": "n9k-eor-tm4", + "subscription": "1", + } + fields := map[string]interface{}{ + "duration": uint64(1200), + "endDepth": int64(0), + "interfaceName": "Eth0/0/0", + "peak": int64(500), + "queue": "queue-255", + "queueType": "unicast", + "threshold": int64(0), + "ts": "2023-08-03T20:12:59.655308Z", + } + acc.AssertContainsTaggedFields(t, "microburst", fields, tags) +} + func TestTCPDialoutMultiple(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "tcp", ServiceAddress: "127.0.0.1:0", Aliases: map[string]string{ - "some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "tcp", + ServiceAddress: "127.0.0.1:0", + Aliases: map[string]string{ + "some": "type:model/some/path", + "parallel": "type:model/parallel/path", + "other": "type:model/other/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -858,21 +1007,40 @@ func TestTCPDialoutMultiple(t *testing.T) { // We use the invalid dialout flags to let the server close the connection require.Equal(t, acc.Errors, []error{errors.New("invalid dialout flags: 257"), errors.New("invalid dialout flags: 257")}) - tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "some", fields, tags) - tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/parallel/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "parallel", fields, tags) - tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/other/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "other", fields, tags) } func TestGRPCDialoutError(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0"} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -896,8 +1064,16 @@ func TestGRPCDialoutError(t *testing.T) { } func TestGRPCDialoutMultiple(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", Aliases: map[string]string{ - "some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + Aliases: map[string]string{ + "some": "type:model/some/path", + "parallel": "type:model/parallel/path", + "other": "type:model/other/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -945,24 +1121,44 @@ func TestGRPCDialoutMultiple(t *testing.T) { require.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: testclose"), errors.New("GRPC dialout error: testclose")}) - tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "some", fields, tags) - tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/parallel/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "parallel", fields, tags) - tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/other/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "other", fields, tags) } func TestGRPCDialoutKeepalive(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", EnforcementPolicy: GRPCEnforcementPolicy{ - PermitKeepaliveWithoutCalls: true, - KeepaliveMinTime: 0, - }} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + EnforcementPolicy: GRPCEnforcementPolicy{ + PermitKeepaliveWithoutCalls: true, + KeepaliveMinTime: 0, + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -985,7 +1181,11 @@ func TestGRPCDialoutKeepalive(t *testing.T) { } func TestSourceFieldRewrite(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{"alias": "type:model/some/path"}, + } c.SourceFieldName = "mdt_source" acc := &testutil.Accumulator{} err := c.Start(acc) @@ -1028,7 +1228,12 @@ func TestSourceFieldRewrite(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags := map[string]string{"path": "type:model/some/path", "mdt_source": "str", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "mdt_source": "str", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"bool": false} acc.AssertContainsTaggedFields(t, "alias", fields, tags) } diff --git a/plugins/inputs/cisco_telemetry_mdt/testdata/microburst b/plugins/inputs/cisco_telemetry_mdt/testdata/microburst new file mode 100644 index 0000000000000000000000000000000000000000..6a97ddfe0c30fd5b09530a24c72c24fc6c3ffbb4 GIT binary patch literal 5804 zcmdUyKX21O6u{dwO&q6Tcq)Hd1eArrii!WGPN#~(fP|Vz&45x>$Z@WUNn!`zIa1%q zhhSmh8^FfM%mU&AuydwuX$#XTrb$OXkakQt+7+W4~xRWA$}2^jup;OId`&z?@#6-{P@!QQc|=*mTOg%T{{3uPa*{?ECuv0#|CF-XnX_>cXn-_|BbWJMaQLZ)523`Ua(Cs5odG_8YO<(^^{KM0aW3iTy(=;U zya$`f1S`Y$p^~PuGTsAu36NuZl5UWiQKI!vht@SF_4EqSS!+q=^cv6^?t<)1T}oe& z1(_;aysb{tj*T}!K6i(a@A28z@Fr9iN%mo+)Q_H@9CcqEzf`*?ScmE&*{y_9ulyg( zadI?A7s4FJ7T6q)s1W8@dDa|51amB16WAP%s2JuLIo2Fg2y={mfz9EFieZkGqdArk z=2(`%=5R#CFvrf(99sx;+9Ln0IHF>hljjM!ErdCa$Tf!}Duy|o9Ba}6Ea7XgE@Zzq^#Er literal 0 HcmV?d00001