From ae98e7f1f6192fb1f46090bd9c46507a243b4861 Mon Sep 17 00:00:00 2001 From: zhiyuan-mojie <351843010@qq.com> Date: Thu, 26 May 2022 02:58:12 +0800 Subject: [PATCH] fix: redis plugin goroutine leak triggered by auto reload config mechanism (#11143) --- plugins/inputs/redis/redis.go | 21 ++++++++++++++++++++- plugins/inputs/redis/redis_test.go | 4 ++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 7bbb0bf4d..9e3b64a2f 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -37,7 +37,7 @@ type Redis struct { Password string tls.ClientConfig - Log telegraf.Logger + Log telegraf.Logger `toml:"-"` clients []Client connected bool @@ -47,6 +47,7 @@ type Client interface { Do(returnType string, args ...interface{}) (interface{}, error) Info() *redis.StringCmd BaseTags() map[string]string + Close() error } type RedisClient struct { @@ -192,6 +193,10 @@ func (r *RedisClient) BaseTags() map[string]string { return tags } +func (r *RedisClient) Close() error { + return r.client.Close() +} + var replicationSlaveMetricPrefix = regexp.MustCompile(`^slave\d+`) var Tracking = map[string]string{ @@ -697,3 +702,17 @@ func coerceType(value interface{}, typ reflect.Type) reflect.Value { } return reflect.ValueOf(value) } + +func (r *Redis) Start(telegraf.Accumulator) error { + return nil +} + +//Stop close the client through ServiceInput interface Start/Stop methods impl. +func (r *Redis) Stop() { + for _, c := range r.clients { + err := c.Close() + if err != nil { + r.Log.Errorf("error closing client: %v", err) + } + } +} diff --git a/plugins/inputs/redis/redis_test.go b/plugins/inputs/redis/redis_test.go index 06332d63b..f363ec0ea 100644 --- a/plugins/inputs/redis/redis_test.go +++ b/plugins/inputs/redis/redis_test.go @@ -29,6 +29,10 @@ func (t *testClient) Do(_ string, _ ...interface{}) (interface{}, error) { return 2, nil } +func (t *testClient) Close() error { + return nil +} + func TestRedisConnectIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode")