feat(inputs.cisco_telemetry_mdt): Add GRPC Keepalive/timeout config options (#11458)

This commit is contained in:
Danial Ebling 2022-07-18 13:07:42 -06:00 committed by GitHub
parent b5076363e4
commit 1fa47c8221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 9 deletions

View File

@ -55,8 +55,34 @@ later.
# dnpath = '{"Name": "show ip route summary","prop": [{"Key": "routes","Value": "string"}, {"Key": "best-paths","Value": "string"}]}'
# dnpath2 = '{"Name": "show processes cpu","prop": [{"Key": "kernel_percent","Value": "float"}, {"Key": "idle_percent","Value": "float"}, {"Key": "process","Value": "string"}, {"Key": "user_percent","Value": "float"}, {"Key": "onesec","Value": "float"}]}'
# dnpath3 = '{"Name": "show processes memory physical","prop": [{"Key": "processname","Value": "string"}]}'
## Additional GRPC connection settings.
[inputs.cisco_telemetry_mdt.grpc_enforcement_policy]
## GRPC permit keepalives without calls, set to true if your clients are
## sending pings without calls in-flight. This can sometimes happen on IOS-XE
## devices where the GRPC connection is left open but subscriptions have been
## removed, and adding subsequent subscriptions does not keep a stable session.
# permit_keepalive_without_calls = false
## GRPC minimum timeout between successive pings, decreasing this value may
## help if this plugin is closing connections with ENHANCE_YOUR_CALM (too_many_pings).
# keepalive_minimum_time = "5m"
```
## Metrics
Metrics are named by the encoding path that generated the data, or by the alias
if the `inputs.cisco_telemetry_mdt.aliases` config section is defined.
Metric fields are dependent on the device type and path.
Tags included in all metrics:
- source
- path
- subscription
Additional tags (such as interface_name) may be included depending on the path.
## Example Output
```shell

View File

@ -20,10 +20,12 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip" // Required to allow gzip encoding
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
@ -38,15 +40,25 @@ const (
tcpMaxMsgLen uint32 = 1024 * 1024
)
// default minimum time between successive pings
// this value is specified in the GRPC docs via GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS
const defaultKeepaliveMinTime = config.Duration(time.Second * 300)
type GRPCEnforcementPolicy struct {
PermitKeepaliveWithoutCalls bool `toml:"permit_keepalive_without_calls"`
KeepaliveMinTime config.Duration `toml:"keepalive_minimum_time"`
}
// CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms
type CiscoTelemetryMDT struct {
// Common configuration
Transport string
ServiceAddress string `toml:"service_address"`
MaxMsgSize int `toml:"max_msg_size"`
Aliases map[string]string `toml:"aliases"`
Dmes map[string]string `toml:"dmes"`
EmbeddedTags []string `toml:"embedded_tags"`
Transport string
ServiceAddress string `toml:"service_address"`
MaxMsgSize int `toml:"max_msg_size"`
Aliases map[string]string `toml:"aliases"`
Dmes map[string]string `toml:"dmes"`
EmbeddedTags []string `toml:"embedded_tags"`
EnforcementPolicy GRPCEnforcementPolicy `toml:"grpc_enforcement_policy"`
Log telegraf.Logger
@ -177,6 +189,14 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
opts = append(opts, grpc.MaxRecvMsgSize(c.MaxMsgSize))
}
if c.EnforcementPolicy.PermitKeepaliveWithoutCalls || (c.EnforcementPolicy.KeepaliveMinTime != 0 && c.EnforcementPolicy.KeepaliveMinTime != defaultKeepaliveMinTime) {
// Only set if either parameter does not match defaults
opts = append(opts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Duration(c.EnforcementPolicy.KeepaliveMinTime),
PermitWithoutStream: c.EnforcementPolicy.PermitKeepaliveWithoutCalls,
}))
}
c.grpcServer = grpc.NewServer(opts...)
dialout.RegisterGRPCMdtDialoutServer(c.grpcServer, c)

View File

@ -12,6 +12,7 @@ import (
telemetryBis "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
"github.com/influxdata/telegraf/testutil"
@ -699,7 +700,7 @@ func TestGRPCDialoutError(t *testing.T) {
require.NoError(t, err)
addr := c.Address()
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
client := dialout.NewGRPCMdtDialoutClient(conn)
stream, err := client.MdtDialout(context.Background())
@ -725,7 +726,7 @@ func TestGRPCDialoutMultiple(t *testing.T) {
telemetry := mockTelemetryMessage()
addr := c.Address()
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
require.NoError(t, err)
client := dialout.NewGRPCMdtDialoutClient(conn)
stream, err := client.MdtDialout(context.TODO())
@ -736,7 +737,7 @@ func TestGRPCDialoutMultiple(t *testing.T) {
args := &dialout.MdtDialoutArgs{Data: data, ReqId: 456}
require.NoError(t, stream.Send(args))
conn2, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
conn2, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
require.NoError(t, err)
client2 := dialout.NewGRPCMdtDialoutClient(conn2)
stream2, err := client2.MdtDialout(context.TODO())
@ -778,3 +779,29 @@ func TestGRPCDialoutMultiple(t *testing.T) {
fields = map[string]interface{}{"value": int64(-1)}
acc.AssertContainsTaggedFields(t, "other", fields, tags)
}
func TestGRPCDialoutKeepalive(t *testing.T) {
c := &CiscoTelemetryMDT{Log: testutil.Logger{}, Transport: "grpc", ServiceAddress: "127.0.0.1:0", EnforcementPolicy: GRPCEnforcementPolicy{
PermitKeepaliveWithoutCalls: true,
KeepaliveMinTime: 0,
}}
acc := &testutil.Accumulator{}
err := c.Start(acc)
require.NoError(t, err)
addr := c.Address()
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
client := dialout.NewGRPCMdtDialoutClient(conn)
stream, err := client.MdtDialout(context.Background())
require.NoError(t, err)
telemetry := mockTelemetryMessage()
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
args := &dialout.MdtDialoutArgs{Data: data, ReqId: 456}
require.NoError(t, stream.Send(args))
c.Stop()
require.NoError(t, conn.Close())
}

View File

@ -38,3 +38,15 @@
# dnpath = '{"Name": "show ip route summary","prop": [{"Key": "routes","Value": "string"}, {"Key": "best-paths","Value": "string"}]}'
# dnpath2 = '{"Name": "show processes cpu","prop": [{"Key": "kernel_percent","Value": "float"}, {"Key": "idle_percent","Value": "float"}, {"Key": "process","Value": "string"}, {"Key": "user_percent","Value": "float"}, {"Key": "onesec","Value": "float"}]}'
# dnpath3 = '{"Name": "show processes memory physical","prop": [{"Key": "processname","Value": "string"}]}'
## Additional GRPC connection settings.
[inputs.cisco_telemetry_mdt.grpc_enforcement_policy]
## GRPC permit keepalives without calls, set to true if your clients are
## sending pings without calls in-flight. This can sometimes happen on IOS-XE
## devices where the GRPC connection is left open but subscriptions have been
## removed, and adding subsequent subscriptions does not keep a stable session.
# permit_keepalive_without_calls = false
## GRPC minimum timeout between successive pings, decreasing this value may
## help if this plugin is closing connections with ENHANCE_YOUR_CALM (too_many_pings).
# keepalive_minimum_time = "5m"