From 5b9f7e7bf317a896756a3a9106565f1089106fe1 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Mon, 18 May 2015 09:26:10 -0700 Subject: [PATCH] Add ability to query many redis servers --- plugins/redis/{gatherer.go => redis.go} | 40 +++++++++-- plugins/redis/redis_test.go | 93 ++++++++++++++++++++++++- 2 files changed, 127 insertions(+), 6 deletions(-) rename plugins/redis/{gatherer.go => redis.go} (81%) diff --git a/plugins/redis/gatherer.go b/plugins/redis/redis.go similarity index 81% rename from plugins/redis/gatherer.go rename to plugins/redis/redis.go index 1acb5820a..22a9ce0d6 100644 --- a/plugins/redis/gatherer.go +++ b/plugins/redis/redis.go @@ -7,13 +7,15 @@ import ( "net" "strconv" "strings" + "sync" "github.com/influxdb/tivan/plugins" ) -type Gatherer struct { +type Redis struct { Disabled bool Address string + Servers []string c net.Conn buf []byte @@ -54,13 +56,41 @@ var Tracking = map[string]string{ var ErrProtocolError = errors.New("redis protocol error") -func (g *Gatherer) Gather(acc plugins.Accumulator) error { - if g.Address == "" || g.Disabled { +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (g *Redis) Gather(acc plugins.Accumulator) error { + if g.Disabled { return nil } + var wg sync.WaitGroup + + var outerr error + + var servers []string + + if g.Address != "" { + servers = append(servers, g.Address) + } + + servers = append(servers, g.Servers...) + + for _, serv := range servers { + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = g.gatherServer(serv, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +func (g *Redis) gatherServer(addr string, acc plugins.Accumulator) error { if g.c == nil { - c, err := net.Dial("tcp", g.Address) + c, err := net.Dial("tcp", addr) if err != nil { return err } @@ -134,6 +164,6 @@ func (g *Gatherer) Gather(acc plugins.Accumulator) error { func init() { plugins.Add("redis", func() plugins.Plugin { - return &Gatherer{} + return &Redis{} }) } diff --git a/plugins/redis/redis_test.go b/plugins/redis/redis_test.go index b7d3af11c..a2252d74b 100644 --- a/plugins/redis/redis_test.go +++ b/plugins/redis/redis_test.go @@ -42,7 +42,7 @@ func TestRedisGeneratesMetrics(t *testing.T) { addr := l.Addr().String() - r := &Gatherer{ + r := &Redis{ Address: addr, } @@ -102,6 +102,97 @@ func TestRedisGeneratesMetrics(t *testing.T) { } } +func TestRedisCanPullStatsFromMultipleServers(t *testing.T) { + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + + defer l.Close() + + go func() { + c, err := l.Accept() + if err != nil { + return + } + + buf := bufio.NewReader(c) + + for { + line, err := buf.ReadString('\n') + if err != nil { + return + } + + if line != "info\n" { + return + } + + fmt.Fprintf(c, "$%d\n", len(testOutput)) + c.Write([]byte(testOutput)) + } + }() + + addr := l.Addr().String() + + r := &Redis{ + Servers: []string{addr}, + } + + var acc testutil.Accumulator + + err = r.Gather(&acc) + require.NoError(t, err) + + checkInt := []struct { + name string + value uint64 + }{ + {"redis_uptime", 238}, + {"redis_clients", 1}, + {"redis_used_memory", 1003936}, + {"redis_used_memory_rss", 811008}, + {"redis_used_memory_peak", 1003936}, + {"redis_used_memory_lua", 33792}, + {"redis_rdb_changes_since_last_save", 0}, + {"redis_total_connections_received", 2}, + {"redis_total_commands_processed", 1}, + {"redis_instantaneous_ops_per_sec", 0}, + {"redis_sync_full", 0}, + {"redis_sync_partial_ok", 0}, + {"redis_sync_partial_err", 0}, + {"redis_expired_keys", 0}, + {"redis_evicted_keys", 0}, + {"redis_keyspace_hits", 0}, + {"redis_keyspace_misses", 0}, + {"redis_pubsub_channels", 0}, + {"redis_pubsub_patterns", 0}, + {"redis_latest_fork_usec", 0}, + {"redis_connected_slaves", 0}, + {"redis_master_repl_offset", 0}, + {"redis_repl_backlog_active", 0}, + {"redis_repl_backlog_size", 1048576}, + {"redis_repl_backlog_histlen", 0}, + } + + for _, c := range checkInt { + assert.NoError(t, acc.ValidateValue(c.name, c.value)) + } + + checkFloat := []struct { + name string + value float64 + }{ + {"redis_mem_fragmentation_ratio", 0.81}, + {"redis_used_cpu_sys", 0.14}, + {"redis_used_cpu_user", 0.05}, + {"redis_used_cpu_sys_children", 0.00}, + {"redis_used_cpu_user_children", 0.00}, + } + + for _, c := range checkFloat { + assert.NoError(t, acc.ValidateValue(c.name, c.value)) + } +} + const testOutput = `# Server redis_version:2.8.9 redis_git_sha1:00000000