Add set, and histogram reporting to aerospike telegraf plugin (#8025)
Co-authored-by: Joshua Gross <joshua.gross@indexexchange.com>
This commit is contained in:
parent
ac809e9e5b
commit
4ebb8c7820
|
|
@ -2,9 +2,12 @@ version: '3'
|
||||||
|
|
||||||
services:
|
services:
|
||||||
aerospike:
|
aerospike:
|
||||||
image: aerospike/aerospike-server:3.9.0
|
image: aerospike/aerospike-server:4.9.0.11
|
||||||
ports:
|
ports:
|
||||||
- "3000:3000"
|
- "3000:3000"
|
||||||
|
- "3001:3001"
|
||||||
|
- "3002:3002"
|
||||||
|
- "3003:3003"
|
||||||
zookeeper:
|
zookeeper:
|
||||||
image: wurstmeister/zookeeper
|
image: wurstmeister/zookeeper
|
||||||
environment:
|
environment:
|
||||||
|
|
|
||||||
File diff suppressed because one or more lines are too long
|
|
@ -2,6 +2,8 @@ package aerospike
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -27,6 +29,17 @@ type Aerospike struct {
|
||||||
|
|
||||||
initialized bool
|
initialized bool
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
|
|
||||||
|
DisableQueryNamespaces bool `toml:"disable_query_namespaces"`
|
||||||
|
Namespaces []string `toml:"namespaces"`
|
||||||
|
|
||||||
|
QuerySets bool `toml:"query_sets"`
|
||||||
|
Sets []string `toml:"sets"`
|
||||||
|
|
||||||
|
EnableTTLHistogram bool `toml:"enable_ttl_histogram"`
|
||||||
|
EnableObjectSizeLinearHistogram bool `toml:"enable_object_size_linear_histogram"`
|
||||||
|
|
||||||
|
NumberHistogramBuckets int `toml:"num_histogram_buckets"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
|
|
@ -45,7 +58,28 @@ var sampleConfig = `
|
||||||
# tls_key = "/etc/telegraf/key.pem"
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
## If false, skip chain & host verification
|
## If false, skip chain & host verification
|
||||||
# insecure_skip_verify = true
|
# insecure_skip_verify = true
|
||||||
`
|
|
||||||
|
# Feature Options
|
||||||
|
# Add namespace variable to limit the namespaces executed on
|
||||||
|
# Leave blank to do all
|
||||||
|
# disable_query_namespaces = true # default false
|
||||||
|
# namespaces = ["namespace1", "namespace2"]
|
||||||
|
|
||||||
|
# Enable set level telmetry
|
||||||
|
# query_sets = true # default: false
|
||||||
|
# Add namespace set combinations to limit sets executed on
|
||||||
|
# Leave blank to do all sets
|
||||||
|
# sets = ["namespace1/set1", "namespace1/set2", "namespace3"]
|
||||||
|
|
||||||
|
# Histograms
|
||||||
|
# enable_ttl_histogram = true # default: false
|
||||||
|
# enable_object_size_linear_histogram = true # default: false
|
||||||
|
|
||||||
|
# by default, aerospike produces a 100 bucket histogram
|
||||||
|
# this is not great for most graphing tools, this will allow
|
||||||
|
# the ability to squash this to a smaller number of buckets
|
||||||
|
# num_histogram_buckets = 100 # default: 10
|
||||||
|
`
|
||||||
|
|
||||||
func (a *Aerospike) SampleConfig() string {
|
func (a *Aerospike) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
|
|
@ -68,6 +102,14 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
||||||
a.initialized = true
|
a.initialized = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if a.NumberHistogramBuckets == 0 {
|
||||||
|
a.NumberHistogramBuckets = 10
|
||||||
|
} else if a.NumberHistogramBuckets > 100 {
|
||||||
|
a.NumberHistogramBuckets = 100
|
||||||
|
} else if a.NumberHistogramBuckets < 1 {
|
||||||
|
a.NumberHistogramBuckets = 10
|
||||||
|
}
|
||||||
|
|
||||||
if len(a.Servers) == 0 {
|
if len(a.Servers) == 0 {
|
||||||
return a.gatherServer("127.0.0.1:3000", acc)
|
return a.gatherServer("127.0.0.1:3000", acc)
|
||||||
}
|
}
|
||||||
|
|
@ -85,8 +127,8 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error {
|
func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) error {
|
||||||
host, port, err := net.SplitHostPort(hostport)
|
host, port, err := net.SplitHostPort(hostPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -108,53 +150,325 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
|
||||||
|
|
||||||
nodes := c.GetNodes()
|
nodes := c.GetNodes()
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
tags := map[string]string{
|
stats, err := a.getNodeInfo(n)
|
||||||
"aerospike_host": hostport,
|
|
||||||
"node_name": n.GetName(),
|
|
||||||
}
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
stats, err := as.RequestNodeStats(n)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for k, v := range stats {
|
a.parseNodeInfo(stats, hostPort, n.GetName(), acc)
|
||||||
val := parseValue(v)
|
|
||||||
fields[strings.Replace(k, "-", "_", -1)] = val
|
|
||||||
}
|
|
||||||
acc.AddFields("aerospike_node", fields, tags, time.Now())
|
|
||||||
|
|
||||||
info, err := as.RequestNodeInfo(n, "namespaces")
|
namespaces, err := a.getNamespaces(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
namespaces := strings.Split(info["namespaces"], ";")
|
|
||||||
|
|
||||||
for _, namespace := range namespaces {
|
if !a.DisableQueryNamespaces {
|
||||||
nTags := map[string]string{
|
// Query Namespaces
|
||||||
"aerospike_host": hostport,
|
for _, namespace := range namespaces {
|
||||||
"node_name": n.GetName(),
|
stats, err = a.getNamespaceInfo(namespace, n)
|
||||||
}
|
|
||||||
nTags["namespace"] = namespace
|
if err != nil {
|
||||||
nFields := make(map[string]interface{})
|
|
||||||
info, err := as.RequestNodeInfo(n, "namespace/"+namespace)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
stats := strings.Split(info["namespace/"+namespace], ";")
|
|
||||||
for _, stat := range stats {
|
|
||||||
parts := strings.Split(stat, "=")
|
|
||||||
if len(parts) < 2 {
|
|
||||||
continue
|
continue
|
||||||
|
} else {
|
||||||
|
a.parseNamespaceInfo(stats, hostPort, namespace, n.GetName(), acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.EnableTTLHistogram {
|
||||||
|
err = a.getTTLHistogram(hostPort, namespace, "", n, acc)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if a.EnableObjectSizeLinearHistogram {
|
||||||
|
err = a.getObjectSizeLinearHistogram(hostPort, namespace, "", n, acc)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.QuerySets {
|
||||||
|
namespaceSets, err := a.getSets(n)
|
||||||
|
if err == nil {
|
||||||
|
for _, namespaceSet := range namespaceSets {
|
||||||
|
namespace, set := splitNamespaceSet(namespaceSet)
|
||||||
|
|
||||||
|
stats, err := a.getSetInfo(namespaceSet, n)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
a.parseSetInfo(stats, hostPort, namespaceSet, n.GetName(), acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.EnableTTLHistogram {
|
||||||
|
err = a.getTTLHistogram(hostPort, namespace, set, n, acc)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.EnableObjectSizeLinearHistogram {
|
||||||
|
err = a.getObjectSizeLinearHistogram(hostPort, namespace, set, n, acc)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
val := parseValue(parts[1])
|
|
||||||
nFields[strings.Replace(parts[0], "-", "_", -1)] = val
|
|
||||||
}
|
}
|
||||||
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getNodeInfo(n *as.Node) (map[string]string, error) {
|
||||||
|
stats, err := as.RequestNodeStats(n)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) parseNodeInfo(stats map[string]string, hostPort string, nodeName string, acc telegraf.Accumulator) {
|
||||||
|
tags := map[string]string{
|
||||||
|
"aerospike_host": hostPort,
|
||||||
|
"node_name": nodeName,
|
||||||
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
|
||||||
|
for k, v := range stats {
|
||||||
|
val := parseValue(v)
|
||||||
|
fields[strings.Replace(k, "-", "_", -1)] = val
|
||||||
|
}
|
||||||
|
acc.AddFields("aerospike_node", fields, tags, time.Now())
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getNamespaces(n *as.Node) ([]string, error) {
|
||||||
|
var namespaces []string
|
||||||
|
if len(a.Namespaces) <= 0 {
|
||||||
|
info, err := as.RequestNodeInfo(n, "namespaces")
|
||||||
|
if err != nil {
|
||||||
|
return namespaces, err
|
||||||
|
}
|
||||||
|
namespaces = strings.Split(info["namespaces"], ";")
|
||||||
|
} else {
|
||||||
|
namespaces = a.Namespaces
|
||||||
|
}
|
||||||
|
|
||||||
|
return namespaces, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getNamespaceInfo(namespace string, n *as.Node) (map[string]string, error) {
|
||||||
|
stats, err := as.RequestNodeInfo(n, "namespace/"+namespace)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats, err
|
||||||
|
}
|
||||||
|
func (a *Aerospike) parseNamespaceInfo(stats map[string]string, hostPort string, namespace string, nodeName string, acc telegraf.Accumulator) {
|
||||||
|
|
||||||
|
nTags := map[string]string{
|
||||||
|
"aerospike_host": hostPort,
|
||||||
|
"node_name": nodeName,
|
||||||
|
}
|
||||||
|
nTags["namespace"] = namespace
|
||||||
|
nFields := make(map[string]interface{})
|
||||||
|
|
||||||
|
stat := strings.Split(stats["namespace/"+namespace], ";")
|
||||||
|
for _, pair := range stat {
|
||||||
|
parts := strings.Split(pair, "=")
|
||||||
|
if len(parts) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
val := parseValue(parts[1])
|
||||||
|
nFields[strings.Replace(parts[0], "-", "_", -1)] = val
|
||||||
|
}
|
||||||
|
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getSets(n *as.Node) ([]string, error) {
|
||||||
|
var namespaceSets []string
|
||||||
|
// Gather all sets
|
||||||
|
if len(a.Sets) <= 0 {
|
||||||
|
stats, err := as.RequestNodeInfo(n, "sets")
|
||||||
|
if err != nil {
|
||||||
|
return namespaceSets, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stat := strings.Split(stats["sets"], ";")
|
||||||
|
for _, setStats := range stat {
|
||||||
|
// setInfo is "ns=test:set=foo:objects=1:tombstones=0"
|
||||||
|
if len(setStats) > 0 {
|
||||||
|
pairs := strings.Split(setStats, ":")
|
||||||
|
var ns, set string
|
||||||
|
for _, pair := range pairs {
|
||||||
|
parts := strings.Split(pair, "=")
|
||||||
|
if len(parts) == 2 {
|
||||||
|
if parts[0] == "ns" {
|
||||||
|
ns = parts[1]
|
||||||
|
}
|
||||||
|
if parts[0] == "set" {
|
||||||
|
set = parts[1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(ns) > 0 && len(set) > 0 {
|
||||||
|
namespaceSets = append(namespaceSets, fmt.Sprintf("%s/%s", ns, set))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else { // User has passed in sets
|
||||||
|
namespaceSets = a.Sets
|
||||||
|
}
|
||||||
|
|
||||||
|
return namespaceSets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getSetInfo(namespaceSet string, n *as.Node) (map[string]string, error) {
|
||||||
|
stats, err := as.RequestNodeInfo(n, "sets/"+namespaceSet)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) parseSetInfo(stats map[string]string, hostPort string, namespaceSet string, nodeName string, acc telegraf.Accumulator) {
|
||||||
|
|
||||||
|
stat := strings.Split(
|
||||||
|
strings.TrimSuffix(
|
||||||
|
stats[fmt.Sprintf("sets/%s", namespaceSet)], ";"), ":")
|
||||||
|
nTags := map[string]string{
|
||||||
|
"aerospike_host": hostPort,
|
||||||
|
"node_name": nodeName,
|
||||||
|
"set": namespaceSet,
|
||||||
|
}
|
||||||
|
nFields := make(map[string]interface{})
|
||||||
|
for _, part := range stat {
|
||||||
|
pieces := strings.Split(part, "=")
|
||||||
|
if len(pieces) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
val := parseValue(pieces[1])
|
||||||
|
nFields[strings.Replace(pieces[0], "-", "_", -1)] = val
|
||||||
|
}
|
||||||
|
acc.AddFields("aerospike_set", nFields, nTags, time.Now())
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getTTLHistogram(hostPort string, namespace string, set string, n *as.Node, acc telegraf.Accumulator) error {
|
||||||
|
stats, err := a.getHistogram(namespace, set, "ttl", n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.parseHistogram(stats, hostPort, namespace, set, "ttl", n.GetName(), acc)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getObjectSizeLinearHistogram(hostPort string, namespace string, set string, n *as.Node, acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
|
stats, err := a.getHistogram(namespace, set, "object-size-linear", n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.parseHistogram(stats, hostPort, namespace, set, "object-size-linear", n.GetName(), acc)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) getHistogram(namespace string, set string, histogramType string, n *as.Node) (map[string]string, error) {
|
||||||
|
var queryArg string
|
||||||
|
if len(set) > 0 {
|
||||||
|
queryArg = fmt.Sprintf("histogram:type=%s;namespace=%v;set=%v", histogramType, namespace, set)
|
||||||
|
} else {
|
||||||
|
queryArg = fmt.Sprintf("histogram:type=%s;namespace=%v", histogramType, namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
stats, err := as.RequestNodeInfo(n, queryArg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Aerospike) parseHistogram(stats map[string]string, hostPort string, namespace string, set string, histogramType string, nodeName string, acc telegraf.Accumulator) {
|
||||||
|
|
||||||
|
nTags := map[string]string{
|
||||||
|
"aerospike_host": hostPort,
|
||||||
|
"node_name": nodeName,
|
||||||
|
"namespace": namespace,
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(set) > 0 {
|
||||||
|
nTags["set"] = set
|
||||||
|
}
|
||||||
|
|
||||||
|
nFields := make(map[string]interface{})
|
||||||
|
|
||||||
|
for _, stat := range stats {
|
||||||
|
for _, part := range strings.Split(stat, ":") {
|
||||||
|
pieces := strings.Split(part, "=")
|
||||||
|
if len(pieces) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if pieces[0] == "buckets" {
|
||||||
|
buckets := strings.Split(pieces[1], ",")
|
||||||
|
|
||||||
|
// Normalize incase of less buckets than expected
|
||||||
|
numRecordsPerBucket := 1
|
||||||
|
if len(buckets) > a.NumberHistogramBuckets {
|
||||||
|
numRecordsPerBucket = int(math.Ceil((float64(len(buckets)) / float64(a.NumberHistogramBuckets))))
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketCount := 0
|
||||||
|
bucketSum := int64(0) // cast to int64, as can have large object sums
|
||||||
|
bucketName := 0
|
||||||
|
for i, bucket := range buckets {
|
||||||
|
// Sum records and increment bucket collection counter
|
||||||
|
if bucketCount < numRecordsPerBucket {
|
||||||
|
bucketSum = bucketSum + parseValue(bucket).(int64)
|
||||||
|
bucketCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store records and reset counters
|
||||||
|
// increment bucket name
|
||||||
|
if bucketCount == numRecordsPerBucket {
|
||||||
|
nFields[strconv.Itoa(bucketName)] = bucketSum
|
||||||
|
|
||||||
|
bucketCount = 0
|
||||||
|
bucketSum = 0
|
||||||
|
bucketName++
|
||||||
|
} else if i == (len(buckets) - 1) {
|
||||||
|
// base/edge case where final bucket does not fully
|
||||||
|
// fill number of records per bucket
|
||||||
|
nFields[strconv.Itoa(bucketName)] = bucketSum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields(fmt.Sprintf("aerospike_histogram_%v", strings.Replace(histogramType, "-", "_", -1)), nFields, nTags, time.Now())
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitNamespaceSet(namespaceSet string) (string, string) {
|
||||||
|
split := strings.Split(namespaceSet, "/")
|
||||||
|
return split[0], split[1]
|
||||||
|
}
|
||||||
|
|
||||||
func parseValue(v string) interface{} {
|
func parseValue(v string) interface{} {
|
||||||
if parsed, err := strconv.ParseInt(v, 10, 64); err == nil {
|
if parsed, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||||||
return parsed
|
return parsed
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package aerospike
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
as "github.com/aerospike/aerospike-client-go"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
@ -26,7 +27,11 @@ func TestAerospikeStatistics(t *testing.T) {
|
||||||
assert.True(t, acc.HasTag("aerospike_node", "node_name"))
|
assert.True(t, acc.HasTag("aerospike_node", "node_name"))
|
||||||
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
||||||
assert.True(t, acc.HasTag("aerospike_namespace", "node_name"))
|
assert.True(t, acc.HasTag("aerospike_namespace", "node_name"))
|
||||||
assert.True(t, acc.HasInt64Field("aerospike_node", "batch_error"))
|
assert.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error"))
|
||||||
|
|
||||||
|
namespaceName := acc.TagValue("aerospike_namespace", "namespace")
|
||||||
|
assert.Equal(t, namespaceName, "test")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAerospikeStatisticsPartialErr(t *testing.T) {
|
func TestAerospikeStatisticsPartialErr(t *testing.T) {
|
||||||
|
|
@ -42,19 +47,419 @@ func TestAerospikeStatisticsPartialErr(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
err := acc.GatherError(a.Gather)
|
||||||
|
|
||||||
require.Error(t, acc.GatherError(a.Gather))
|
require.Error(t, err)
|
||||||
|
|
||||||
assert.True(t, acc.HasMeasurement("aerospike_node"))
|
assert.True(t, acc.HasMeasurement("aerospike_node"))
|
||||||
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
||||||
assert.True(t, acc.HasInt64Field("aerospike_node", "batch_error"))
|
assert.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error"))
|
||||||
|
namespaceName := acc.TagSetValue("aerospike_namespace", "namespace")
|
||||||
|
assert.Equal(t, namespaceName, "test")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSelectNamepsaces(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Select nonexistent namespace
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{testutil.GetLocalHost() + ":3000"},
|
||||||
|
Namespaces: []string{"notTest"},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err := acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_node"))
|
||||||
|
assert.True(t, acc.HasTag("aerospike_node", "node_name"))
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
||||||
|
assert.True(t, acc.HasTag("aerospike_namespace", "node_name"))
|
||||||
|
|
||||||
|
// Expect only 1 namespace
|
||||||
|
count := 0
|
||||||
|
for _, p := range acc.Metrics {
|
||||||
|
if p.Measurement == "aerospike_namespace" {
|
||||||
|
count += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Equal(t, count, 1)
|
||||||
|
|
||||||
|
// expect namespace to have no fields as nonexistent
|
||||||
|
assert.False(t, acc.HasInt64Field("aerospke_namespace", "appeals_tx_remaining"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDisableQueryNamespaces(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
},
|
||||||
|
DisableQueryNamespaces: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_node"))
|
||||||
|
assert.False(t, acc.HasMeasurement("aerospike_namespace"))
|
||||||
|
|
||||||
|
a.DisableQueryNamespaces = false
|
||||||
|
err = acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_node"))
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQuerySets(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a set
|
||||||
|
// test is the default namespace from aerospike
|
||||||
|
policy := as.NewClientPolicy()
|
||||||
|
client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000)
|
||||||
|
|
||||||
|
key, err := as.NewKey("test", "foo", 123)
|
||||||
|
require.NoError(t, err)
|
||||||
|
bins := as.BinMap{
|
||||||
|
"e": 2,
|
||||||
|
"pi": 3,
|
||||||
|
}
|
||||||
|
err = client.Add(nil, key, bins)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
key, err = as.NewKey("test", "bar", 1234)
|
||||||
|
require.NoError(t, err)
|
||||||
|
bins = as.BinMap{
|
||||||
|
"e": 2,
|
||||||
|
"pi": 3,
|
||||||
|
}
|
||||||
|
err = client.Add(nil, key, bins)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
},
|
||||||
|
QuerySets: true,
|
||||||
|
DisableQueryNamespaces: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err = acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo"))
|
||||||
|
assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar"))
|
||||||
|
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_set"))
|
||||||
|
assert.True(t, acc.HasTag("aerospike_set", "set"))
|
||||||
|
assert.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes"))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSelectQuerySets(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a set
|
||||||
|
// test is the default namespace from aerospike
|
||||||
|
policy := as.NewClientPolicy()
|
||||||
|
client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000)
|
||||||
|
|
||||||
|
key, err := as.NewKey("test", "foo", 123)
|
||||||
|
require.NoError(t, err)
|
||||||
|
bins := as.BinMap{
|
||||||
|
"e": 2,
|
||||||
|
"pi": 3,
|
||||||
|
}
|
||||||
|
err = client.Add(nil, key, bins)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
key, err = as.NewKey("test", "bar", 1234)
|
||||||
|
require.NoError(t, err)
|
||||||
|
bins = as.BinMap{
|
||||||
|
"e": 2,
|
||||||
|
"pi": 3,
|
||||||
|
}
|
||||||
|
err = client.Add(nil, key, bins)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
},
|
||||||
|
QuerySets: true,
|
||||||
|
Sets: []string{"test/foo"},
|
||||||
|
DisableQueryNamespaces: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err = acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo"))
|
||||||
|
assert.False(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar"))
|
||||||
|
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_set"))
|
||||||
|
assert.True(t, acc.HasTag("aerospike_set", "set"))
|
||||||
|
assert.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes"))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDisableTTLHistogram(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
}
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
},
|
||||||
|
QuerySets: true,
|
||||||
|
EnableTTLHistogram: false,
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
No measurement exists
|
||||||
|
*/
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.False(t, acc.HasMeasurement("aerospike_histogram_ttl"))
|
||||||
|
}
|
||||||
|
func TestTTLHistogram(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
} else {
|
||||||
|
t.Skip("Skipping, only passes if the aerospike db has been running for at least 1 hour")
|
||||||
|
}
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
},
|
||||||
|
QuerySets: true,
|
||||||
|
EnableTTLHistogram: true,
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
Produces histogram
|
||||||
|
Measurment exists
|
||||||
|
Has appropriate tags (node name etc)
|
||||||
|
Has appropriate keys (time:value)
|
||||||
|
may be able to leverage histogram plugin
|
||||||
|
*/
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_histogram_ttl"))
|
||||||
|
assert.True(t, FindTagValue(&acc, "aerospike_histogram_ttl", "namespace", "test"))
|
||||||
|
|
||||||
|
}
|
||||||
|
func TestDisableObjectSizeLinearHistogram(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
}
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
},
|
||||||
|
QuerySets: true,
|
||||||
|
EnableObjectSizeLinearHistogram: false,
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
No Measurement
|
||||||
|
*/
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.False(t, acc.HasMeasurement("aerospike_histogram_object_size_linear"))
|
||||||
|
}
|
||||||
|
func TestObjectSizeLinearHistogram(t *testing.T) {
|
||||||
|
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping aerospike integration tests.")
|
||||||
|
} else {
|
||||||
|
t.Skip("Skipping, only passes if the aerospike db has been running for at least 1 hour")
|
||||||
|
}
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
},
|
||||||
|
QuerySets: true,
|
||||||
|
EnableObjectSizeLinearHistogram: true,
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
Produces histogram
|
||||||
|
Measurment exists
|
||||||
|
Has appropriate tags (node name etc)
|
||||||
|
Has appropriate keys (time:value)
|
||||||
|
|
||||||
|
*/
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
err := acc.GatherError(a.Gather)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_histogram_object_size_linear"))
|
||||||
|
assert.True(t, FindTagValue(&acc, "aerospike_histogram_object_size_linear", "namespace", "test"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseNodeInfo(t *testing.T) {
|
||||||
|
a := &Aerospike{}
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
stats := map[string]string{
|
||||||
|
"early_tsvc_from_proxy_error": "0",
|
||||||
|
"cluster_principal": "BB9020012AC4202",
|
||||||
|
"cluster_is_member": "true",
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"early_tsvc_from_proxy_error": int64(0),
|
||||||
|
"cluster_principal": "BB9020012AC4202",
|
||||||
|
"cluster_is_member": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"aerospike_host": "127.0.0.1:3000",
|
||||||
|
"node_name": "TestNodeName",
|
||||||
|
}
|
||||||
|
|
||||||
|
a.parseNodeInfo(stats, "127.0.0.1:3000", "TestNodeName", &acc)
|
||||||
|
acc.AssertContainsTaggedFields(t, "aerospike_node", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseNamespaceInfo(t *testing.T) {
|
||||||
|
a := &Aerospike{}
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
stats := map[string]string{
|
||||||
|
"namespace/test": "ns_cluster_size=1;effective_replication_factor=1;objects=2;tombstones=0;master_objects=2",
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"ns_cluster_size": int64(1),
|
||||||
|
"effective_replication_factor": int64(1),
|
||||||
|
"tombstones": int64(0),
|
||||||
|
"objects": int64(2),
|
||||||
|
"master_objects": int64(2),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"aerospike_host": "127.0.0.1:3000",
|
||||||
|
"node_name": "TestNodeName",
|
||||||
|
"namespace": "test",
|
||||||
|
}
|
||||||
|
|
||||||
|
a.parseNamespaceInfo(stats, "127.0.0.1:3000", "test", "TestNodeName", &acc)
|
||||||
|
acc.AssertContainsTaggedFields(t, "aerospike_namespace", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseSetInfo(t *testing.T) {
|
||||||
|
a := &Aerospike{}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
stats := map[string]string{
|
||||||
|
"sets/test/foo": "objects=1:tombstones=0:memory_data_bytes=26;",
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"objects": int64(1),
|
||||||
|
"tombstones": int64(0),
|
||||||
|
"memory_data_bytes": int64(26),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"aerospike_host": "127.0.0.1:3000",
|
||||||
|
"node_name": "TestNodeName",
|
||||||
|
"set": "test/foo",
|
||||||
|
}
|
||||||
|
a.parseSetInfo(stats, "127.0.0.1:3000", "test/foo", "TestNodeName", &acc)
|
||||||
|
acc.AssertContainsTaggedFields(t, "aerospike_set", expectedFields, expectedTags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseHistogramSet(t *testing.T) {
|
||||||
|
a := &Aerospike{
|
||||||
|
NumberHistogramBuckets: 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
stats := map[string]string{
|
||||||
|
"histogram:type=object-size-linear;namespace=test;set=foo": "units=bytes:hist-width=1048576:bucket-width=1024:buckets=0,1,3,1,6,1,9,1,12,1,15,1,18",
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"0": int64(1),
|
||||||
|
"1": int64(4),
|
||||||
|
"2": int64(7),
|
||||||
|
"3": int64(10),
|
||||||
|
"4": int64(13),
|
||||||
|
"5": int64(16),
|
||||||
|
"6": int64(18),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"aerospike_host": "127.0.0.1:3000",
|
||||||
|
"node_name": "TestNodeName",
|
||||||
|
"namespace": "test",
|
||||||
|
"set": "foo",
|
||||||
|
}
|
||||||
|
|
||||||
|
a.parseHistogram(stats, "127.0.0.1:3000", "test", "foo", "object-size-linear", "TestNodeName", &acc)
|
||||||
|
acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags)
|
||||||
|
|
||||||
|
}
|
||||||
|
func TestParseHistogramNamespace(t *testing.T) {
|
||||||
|
a := &Aerospike{
|
||||||
|
NumberHistogramBuckets: 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
stats := map[string]string{
|
||||||
|
"histogram:type=object-size-linear;namespace=test;set=foo": " units=bytes:hist-width=1048576:bucket-width=1024:buckets=0,1,3,1,6,1,9,1,12,1,15,1,18",
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedFields := map[string]interface{}{
|
||||||
|
"0": int64(1),
|
||||||
|
"1": int64(4),
|
||||||
|
"2": int64(7),
|
||||||
|
"3": int64(10),
|
||||||
|
"4": int64(13),
|
||||||
|
"5": int64(16),
|
||||||
|
"6": int64(18),
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedTags := map[string]string{
|
||||||
|
"aerospike_host": "127.0.0.1:3000",
|
||||||
|
"node_name": "TestNodeName",
|
||||||
|
"namespace": "test",
|
||||||
|
}
|
||||||
|
|
||||||
|
a.parseHistogram(stats, "127.0.0.1:3000", "test", "", "object-size-linear", "TestNodeName", &acc)
|
||||||
|
acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags)
|
||||||
|
|
||||||
|
}
|
||||||
func TestAerospikeParseValue(t *testing.T) {
|
func TestAerospikeParseValue(t *testing.T) {
|
||||||
// uint64 with value bigger than int64 max
|
// uint64 with value bigger than int64 max
|
||||||
val := parseValue("18446744041841121751")
|
val := parseValue("18446744041841121751")
|
||||||
require.Equal(t, uint64(18446744041841121751), val)
|
require.Equal(t, uint64(18446744041841121751), val)
|
||||||
|
|
||||||
|
val = parseValue("true")
|
||||||
|
require.Equal(t, true, val)
|
||||||
|
|
||||||
// int values
|
// int values
|
||||||
val = parseValue("42")
|
val = parseValue("42")
|
||||||
require.Equal(t, val, int64(42), "must be parsed as int")
|
require.Equal(t, val, int64(42), "must be parsed as int")
|
||||||
|
|
@ -63,3 +468,16 @@ func TestAerospikeParseValue(t *testing.T) {
|
||||||
val = parseValue("BB977942A2CA502")
|
val = parseValue("BB977942A2CA502")
|
||||||
require.Equal(t, val, `BB977942A2CA502`, "must be left as string")
|
require.Equal(t, val, `BB977942A2CA502`, "must be left as string")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func FindTagValue(acc *testutil.Accumulator, measurement string, key string, value string) bool {
|
||||||
|
for _, p := range acc.Metrics {
|
||||||
|
if p.Measurement == measurement {
|
||||||
|
v, ok := p.Tags[key]
|
||||||
|
if ok && v == value {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue