parent
143cabc4b7
commit
6be3bd8c9c
|
|
@ -240,14 +240,17 @@ func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Co
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleSubscribeResponse message from gNMI and parse contained telemetry data
|
|
||||||
func (c *GNMI) handleSubscribeResponse(address string, reply *gnmi.SubscribeResponse) {
|
func (c *GNMI) handleSubscribeResponse(address string, reply *gnmi.SubscribeResponse) {
|
||||||
// Check if response is a gNMI Update and if we have a prefix to derive the measurement name
|
switch response := reply.Response.(type) {
|
||||||
response, ok := reply.Response.(*gnmi.SubscribeResponse_Update)
|
case *gnmi.SubscribeResponse_Update:
|
||||||
if !ok {
|
c.handleSubscribeResponseUpdate(address, response)
|
||||||
return
|
case *gnmi.SubscribeResponse_Error:
|
||||||
|
c.Log.Errorf("Subscribe error (%d), %q", response.Error.Code, response.Error.Message)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
|
||||||
|
func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmi.SubscribeResponse_Update) {
|
||||||
var prefix, prefixAliasPath string
|
var prefix, prefixAliasPath string
|
||||||
grouper := metric.NewSeriesGrouper()
|
grouper := metric.NewSeriesGrouper()
|
||||||
timestamp := time.Unix(0, response.Update.Timestamp)
|
timestamp := time.Unix(0, response.Update.Timestamp)
|
||||||
|
|
|
||||||
|
|
@ -403,6 +403,30 @@ func TestNotification(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MockLogger struct {
|
||||||
|
telegraf.Logger
|
||||||
|
lastFormat string
|
||||||
|
lastArgs []interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *MockLogger) Errorf(format string, args ...interface{}) {
|
||||||
|
l.lastFormat = format
|
||||||
|
l.lastArgs = args
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribeResponseError(t *testing.T) {
|
||||||
|
me := "mock error message"
|
||||||
|
var mc uint32 = 7
|
||||||
|
ml := &MockLogger{}
|
||||||
|
plugin := &GNMI{Log: ml}
|
||||||
|
errorResponse := &gnmi.SubscribeResponse_Error{
|
||||||
|
Error: &gnmi.Error{Message: me, Code: mc}}
|
||||||
|
plugin.handleSubscribeResponse(
|
||||||
|
"127.0.0.1:0", &gnmi.SubscribeResponse{Response: errorResponse})
|
||||||
|
require.NotEmpty(t, ml.lastFormat)
|
||||||
|
require.Equal(t, ml.lastArgs, []interface{}{mc, me})
|
||||||
|
}
|
||||||
|
|
||||||
func TestRedial(t *testing.T) {
|
func TestRedial(t *testing.T) {
|
||||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue