diff --git a/plugins/inputs/cisco_telemetry_mdt/README.md b/plugins/inputs/cisco_telemetry_mdt/README.md index 8cc5439ff..8ceadfeb3 100644 --- a/plugins/inputs/cisco_telemetry_mdt/README.md +++ b/plugins/inputs/cisco_telemetry_mdt/README.md @@ -158,4 +158,5 @@ multicast pim NXAPI show ip pim route vrf all multicast pim NXAPI show ip pim rp vrf all multicast pim NXAPI show ip pim statistics vrf all multicast pim NXAPI show ip pim vrf all +microburst NATIVE path microburst ``` diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go index 61e9a7fed..20cd1019a 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -559,6 +559,48 @@ func (c *CiscoTelemetryMDT) parseRib(grouper *metric.SeriesGrouper, field *telem } } +func (c *CiscoTelemetryMDT) parseMicroburst(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, + encodingPath string, tags map[string]string, timestamp time.Time) { + var nxMicro *telemetry.TelemetryField + var nxMicro1 *telemetry.TelemetryField + // Microburst + measurement := encodingPath + if len(field.Fields) > 3 { + nxMicro = field.Fields[2] + if len(nxMicro.Fields) > 0 { + nxMicro1 = nxMicro.Fields[0] + if len(nxMicro1.Fields) >= 3 { + nxMicro = nxMicro1.Fields[3] + } + } + } + for _, subfield := range nxMicro.Fields { + if subfield.Name == "interfaceName" { + tags[subfield.Name] = decodeTag(subfield) + } + + for _, subf := range subfield.Fields { + switch subf.Name { + case "sourceName": + newstr := strings.Split(decodeTag(subf), "-[") + if len(newstr) <= 2 { + tags[subf.Name] = decodeTag(subf) + } else { + intfName := strings.Split(newstr[1], "]") + queue := strings.Split(newstr[2], "]") + tags["interface_name"] = intfName[0] + tags["queue_number"] = queue[0] + } + case "startTs": + tags[subf.Name] = decodeTag(subf) + } + if value := decodeValue(subf); value != nil { + grouper.Add(measurement, tags, timestamp, subf.Name, value) + } + } + } +} + func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, encodingPath string, tags map[string]string, timestamp time.Time) { // DME structure: https://developer.cisco.com/site/nxapi-dme-model-reference-api/ @@ -569,6 +611,11 @@ func (c *CiscoTelemetryMDT) parseClassAttributeField(grouper *metric.SeriesGroup c.parseRib(grouper, field, encodingPath, tags, timestamp) return } + if encodingPath == "microburst" { + //dump microburst + c.parseMicroburst(grouper, field, encodingPath, tags, timestamp) + return + } if field == nil || !isDme || len(field.Fields) == 0 || len(field.Fields[0].Fields) == 0 || len(field.Fields[0].Fields[0].Fields) == 0 { return } diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go index 880330ed7..b0ff020d9 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -6,6 +6,7 @@ import ( "errors" "io" "net" + "os" "testing" "time" @@ -22,7 +23,13 @@ import ( ) func TestHandleTelemetryTwoSimple(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{ + "alias": "type:model/some/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -90,11 +97,22 @@ func TestHandleTelemetryTwoSimple(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags := map[string]string{"path": "type:model/some/path", "name": "str", "uint64": "1234", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "name": "str", + "uint64": "1234", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"bool": true} acc.AssertContainsTaggedFields(t, "alias", fields, tags) - tags = map[string]string{"path": "type:model/some/path", "name": "str2", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/some/path", + "name": "str2", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"bool": false} acc.AssertContainsTaggedFields(t, "alias", fields, tags) } @@ -268,7 +286,13 @@ func TestIncludeDeleteField(t *testing.T) { } func TestHandleTelemetrySingleNested(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nested": "type:model/nested/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{ + "nested": "type:model/nested/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -330,13 +354,22 @@ func TestHandleTelemetrySingleNested(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags := map[string]string{"path": "type:model/nested/path", "level": "3", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/nested/path", + "level": "3", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"nested/value/foo": "bar"} acc.AssertContainsTaggedFields(t, "nested", fields, tags) } func TestHandleEmbeddedTags(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"extra": "type:model/extra"}, EmbeddedTags: []string{"type:model/extra/list/name"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"extra": "type:model/extra"}, + EmbeddedTags: []string{"type:model/extra/list/name"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -400,16 +433,31 @@ func TestHandleEmbeddedTags(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags1 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry1"} + tags1 := map[string]string{ + "path": "type:model/extra", + "foo": "bar", + "source": "hostname", + "subscription": "subscription", + "list/name": "entry1", + } fields1 := map[string]interface{}{"list/test": "foo"} - tags2 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry2"} + tags2 := map[string]string{ + "path": "type:model/extra", + "foo": "bar", + "source": "hostname", + "subscription": "subscription", + "list/name": "entry2", + } fields2 := map[string]interface{}{"list/test": "bar"} acc.AssertContainsTaggedFields(t, "extra", fields1, tags1) acc.AssertContainsTaggedFields(t, "extra", fields2, tags2) } func TestHandleNXAPI(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"nxapi": "show nxapi"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -489,16 +537,34 @@ func TestHandleNXAPI(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags1 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i1", "row_number": "0", "source": "hostname", "subscription": "subscription"} + tags1 := map[string]string{ + "path": "show nxapi", + "foo": "bar", + "TABLE_nxapi": "i1", + "row_number": "0", + "source": "hostname", + "subscription": "subscription", + } fields1 := map[string]interface{}{"value": "foo"} - tags2 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i2", "row_number": "0", "source": "hostname", "subscription": "subscription"} + tags2 := map[string]string{ + "path": "show nxapi", + "foo": "bar", + "TABLE_nxapi": "i2", + "row_number": "0", + "source": "hostname", + "subscription": "subscription", + } fields2 := map[string]interface{}{"value": "bar"} acc.AssertContainsTaggedFields(t, "nxapi", fields1, tags1) acc.AssertContainsTaggedFields(t, "nxapi", fields2, tags2) } func TestHandleNXAPIXformNXAPI(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{"nxapi": "show nxapi"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -579,7 +645,10 @@ func TestHandleNXAPIXformNXAPI(t *testing.T) { } func TestHandleNXXformMulti(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/lldp"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"dme": "sys/lldp"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -659,12 +728,20 @@ func TestHandleNXXformMulti(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) //validate various transformation scenaarios newly added in the code. - fields := map[string]interface{}{"portIdV": "12", "portDesc": "100", "test": int64(281474976710655), "subscriptionId": "2814749767106551"} + fields := map[string]interface{}{ + "portIdV": "12", + "portDesc": "100", + "test": int64(281474976710655), + "subscriptionId": "2814749767106551", + } acc.AssertContainsFields(t, "dme", fields) } func TestHandleNXDME(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/dme"}} + c := &CiscoTelemetryMDT{ + Transport: "dummy", + Aliases: map[string]string{"dme": "sys/dme"}, + } acc := &testutil.Accumulator{} err := c.Start(acc) // error is expected since we are passing in dummy transport @@ -732,13 +809,23 @@ func TestHandleNXDME(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags1 := map[string]string{"path": "sys/dme", "foo": "bar", "fooEntity": "some-rn", "source": "hostname", "subscription": "subscription"} + tags1 := map[string]string{ + "path": "sys/dme", + "foo": "bar", + "fooEntity": "some-rn", + "source": "hostname", + "subscription": "subscription", + } fields1 := map[string]interface{}{"value": "foo"} acc.AssertContainsTaggedFields(t, "dme", fields1, tags1) } func TestTCPDialoutOverflow(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "tcp", ServiceAddress: "127.0.0.1:0"} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "tcp", + ServiceAddress: "127.0.0.1:0", + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -764,6 +851,20 @@ func TestTCPDialoutOverflow(t *testing.T) { require.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000")) } +func mockTelemetryMicroburstMessage() *telemetryBis.Telemetry { + data, err := os.ReadFile("./testdata/microburst") + if err != nil { + panic(err) + } + + newMessage := &telemetryBis.Telemetry{} + err = proto.Unmarshal(data, newMessage) + if err != nil { + panic(err) + } + return newMessage +} + func mockTelemetryMessage() *telemetryBis.Telemetry { return &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, @@ -797,9 +898,57 @@ func mockTelemetryMessage() *telemetryBis.Telemetry { } } +func TestGRPCDialoutMicroburst(t *testing.T) { + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + Aliases: map[string]string{ + "some": "microburst", + "parallel": "type:model/parallel/path", + "other": "type:model/other/path", + }, + } + acc := &testutil.Accumulator{} + err := c.Start(acc) + require.NoError(t, err) + + telemetry := mockTelemetryMicroburstMessage() + data, err := proto.Marshal(telemetry) + require.NoError(t, err) + + c.handleTelemetry(data) + require.Empty(t, acc.Errors) + tags := map[string]string{ + "microburst": "microburst", + "path": "microburst", + "source": "n9k-eor-tm4", + "subscription": "1", + } + fields := map[string]interface{}{ + "duration": uint64(1200), + "endDepth": int64(0), + "interfaceName": "Eth0/0/0", + "peak": int64(500), + "queue": "queue-255", + "queueType": "unicast", + "threshold": int64(0), + "ts": "2023-08-03T20:12:59.655308Z", + } + acc.AssertContainsTaggedFields(t, "microburst", fields, tags) +} + func TestTCPDialoutMultiple(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "tcp", ServiceAddress: "127.0.0.1:0", Aliases: map[string]string{ - "some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "tcp", + ServiceAddress: "127.0.0.1:0", + Aliases: map[string]string{ + "some": "type:model/some/path", + "parallel": "type:model/parallel/path", + "other": "type:model/other/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -858,21 +1007,40 @@ func TestTCPDialoutMultiple(t *testing.T) { // We use the invalid dialout flags to let the server close the connection require.Equal(t, acc.Errors, []error{errors.New("invalid dialout flags: 257"), errors.New("invalid dialout flags: 257")}) - tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "some", fields, tags) - tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/parallel/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "parallel", fields, tags) - tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/other/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "other", fields, tags) } func TestGRPCDialoutError(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0"} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -896,8 +1064,16 @@ func TestGRPCDialoutError(t *testing.T) { } func TestGRPCDialoutMultiple(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", Aliases: map[string]string{ - "some": "type:model/some/path", "parallel": "type:model/parallel/path", "other": "type:model/other/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + Aliases: map[string]string{ + "some": "type:model/some/path", + "parallel": "type:model/parallel/path", + "other": "type:model/other/path", + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -945,24 +1121,44 @@ func TestGRPCDialoutMultiple(t *testing.T) { require.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: testclose"), errors.New("GRPC dialout error: testclose")}) - tags := map[string]string{"path": "type:model/some/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "some", fields, tags) - tags = map[string]string{"path": "type:model/parallel/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/parallel/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "parallel", fields, tags) - tags = map[string]string{"path": "type:model/other/path", "name": "str", "source": "hostname", "subscription": "subscription"} + tags = map[string]string{ + "path": "type:model/other/path", + "name": "str", + "source": "hostname", + "subscription": "subscription", + } fields = map[string]interface{}{"value": int64(-1)} acc.AssertContainsTaggedFields(t, "other", fields, tags) } func TestGRPCDialoutKeepalive(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", EnforcementPolicy: GRPCEnforcementPolicy{ - PermitKeepaliveWithoutCalls: true, - KeepaliveMinTime: 0, - }} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "grpc", + ServiceAddress: "127.0.0.1:0", + EnforcementPolicy: GRPCEnforcementPolicy{ + PermitKeepaliveWithoutCalls: true, + KeepaliveMinTime: 0, + }, + } acc := &testutil.Accumulator{} err := c.Start(acc) require.NoError(t, err) @@ -985,7 +1181,11 @@ func TestGRPCDialoutKeepalive(t *testing.T) { } func TestSourceFieldRewrite(t *testing.T) { - c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}} + c := &CiscoTelemetryMDT{ + Log: testutil.Logger{}, + Transport: "dummy", + Aliases: map[string]string{"alias": "type:model/some/path"}, + } c.SourceFieldName = "mdt_source" acc := &testutil.Accumulator{} err := c.Start(acc) @@ -1028,7 +1228,12 @@ func TestSourceFieldRewrite(t *testing.T) { c.handleTelemetry(data) require.Empty(t, acc.Errors) - tags := map[string]string{"path": "type:model/some/path", "mdt_source": "str", "source": "hostname", "subscription": "subscription"} + tags := map[string]string{ + "path": "type:model/some/path", + "mdt_source": "str", + "source": "hostname", + "subscription": "subscription", + } fields := map[string]interface{}{"bool": false} acc.AssertContainsTaggedFields(t, "alias", fields, tags) } diff --git a/plugins/inputs/cisco_telemetry_mdt/testdata/microburst b/plugins/inputs/cisco_telemetry_mdt/testdata/microburst new file mode 100644 index 000000000..6a97ddfe0 Binary files /dev/null and b/plugins/inputs/cisco_telemetry_mdt/testdata/microburst differ