fix(input.redis_sentinel): fix sentinel and replica stats gathering (#12229)

This commit is contained in:
Petar Obradović 2022-11-16 21:43:30 +01:00 committed by GitHub
parent d2268f04d0
commit 61f64506c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 16 deletions

View File

@ -243,6 +243,7 @@ func (client *RedisSentinelClient) gatherMasterStats(acc telegraf.Accumulator) (
if !ok {
return masterNames, fmt.Errorf("unable to resolve master name")
}
masterNames = append(masterNames, masterName)
quorumCmd := redis.NewStringCmd("sentinel", "ckquorum", masterName)
quorumErr := client.sentinel.Process(quorumCmd)

View File

@ -3,6 +3,7 @@ package redis_sentinel
import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"testing"
@ -10,6 +11,7 @@ import (
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
@ -17,32 +19,64 @@ import (
)
const masterName = "mymaster"
const networkName = "telegraf-test-redis-sentinel"
const sentinelServicePort = "26379"
func TestRedisSentinelConnectIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
servicePort := "6379"
container := testutil.Container{
Image: "redis:alpine",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()
ctx := context.Background()
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()
addr := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
redis := createRedisContainer()
err = redis.Start()
require.NoError(t, err, "failed to start container")
defer redis.Terminate()
firstSentinel := createSentinelContainer(redis.Name, wait.ForAll(
wait.ForLog("+monitor master"),
wait.ForListeningPort(nat.Port(sentinelServicePort)),
))
err = firstSentinel.Start()
require.NoError(t, err, "failed to start container")
defer firstSentinel.Terminate()
secondSentinel := createSentinelContainer(redis.Name, wait.ForAll(
wait.ForLog("+sentinel sentinel"),
wait.ForListeningPort(nat.Port(sentinelServicePort)),
))
err = secondSentinel.Start()
require.NoError(t, err, "failed to start container")
defer secondSentinel.Terminate()
addr := fmt.Sprintf("tcp://%s:%s", secondSentinel.Address, secondSentinel.Ports[sentinelServicePort])
r := &RedisSentinel{
Servers: []string{addr},
}
err = r.Init()
require.NoError(t, err, "failed to run Init function")
var acc testutil.Accumulator
err = acc.GatherError(r.Gather)
require.NoError(t, err)
require.True(t, acc.HasMeasurement("redis_sentinel_masters"), "redis_sentinel_masters measurement is missing")
require.True(t, acc.HasMeasurement("redis_sentinel_sentinels"), "redis_sentinel_sentinels measurement is missing")
require.True(t, acc.HasMeasurement("redis_sentinel"), "redis_sentinel measurement is missing")
}
func TestRedisSentinelMasters(t *testing.T) {
@ -109,8 +143,8 @@ func TestRedisSentinelMasters(t *testing.T) {
"runid": "ff3dadd1cfea3043de4d25711d93f01a564562f7",
}
sentinelTags, sentinelFields, sentinalErr := convertSentinelMastersOutput(globalTags, sentinelMastersOutput, nil)
require.NoErrorf(t, sentinalErr, "failed converting output: %v", sentinalErr)
sentinelTags, sentinelFields, sentinelErr := convertSentinelMastersOutput(globalTags, sentinelMastersOutput, nil)
require.NoErrorf(t, sentinelErr, "failed converting output: %v", sentinelErr)
actualMetrics := []telegraf.Metric{
testutil.MustMetric(measurementMasters, sentinelTags, sentinelFields, now),
@ -231,8 +265,8 @@ func TestRedisSentinelReplicas(t *testing.T) {
"slave_repl_offset": "1392400",
}
sentinelTags, sentinelFields, sentinalErr := convertSentinelReplicaOutput(globalTags, masterName, replicasOutput)
require.NoErrorf(t, sentinalErr, "failed converting output: %v", sentinalErr)
sentinelTags, sentinelFields, sentinelErr := convertSentinelReplicaOutput(globalTags, masterName, replicasOutput)
require.NoErrorf(t, sentinelErr, "failed converting output: %v", sentinelErr)
actualMetrics := []telegraf.Metric{
testutil.MustMetric(measurementReplicas, sentinelTags, sentinelFields, now),
@ -312,8 +346,8 @@ func TestRedisSentinelInfoAll(t *testing.T) {
rdr := bufio.NewReader(bytes.NewReader(sentinelInfoResponse))
sentinelTags, sentinelFields, sentinalErr := convertSentinelInfoOutput(globalTags, rdr)
require.NoErrorf(t, sentinalErr, "failed converting output: %v", sentinalErr)
sentinelTags, sentinelFields, sentinelErr := convertSentinelInfoOutput(globalTags, rdr)
require.NoErrorf(t, sentinelErr, "failed converting output: %v", sentinelErr)
actualMetrics := []telegraf.Metric{
testutil.MustMetric(measurementSentinel, sentinelTags, sentinelFields, now),
@ -321,3 +355,27 @@ func TestRedisSentinelInfoAll(t *testing.T) {
testutil.RequireMetricsEqual(t, expectedMetrics, actualMetrics)
}
func createRedisContainer() testutil.Container {
return testutil.Container{
Image: "redis:7.0-alpine",
Name: "telegraf-test-redis-sentinel-redis",
Networks: []string{networkName},
WaitingFor: wait.ForAll(
wait.ForLog("Ready to accept connections"),
wait.ForExposedPort(),
),
}
}
func createSentinelContainer(redisAddress string, waitingFor wait.Strategy) testutil.Container {
return testutil.Container{
Image: "bitnami/redis-sentinel:7.0",
ExposedPorts: []string{sentinelServicePort},
Networks: []string{networkName},
Env: map[string]string{
"REDIS_MASTER_HOST": redisAddress,
},
WaitingFor: waitingFor,
}
}