Linter fixes for plugins/inputs/[ab]* (#9191)

This commit is contained in:
Paweł Żak 2021-04-26 22:57:05 +02:00 committed by GitHub
parent 83e7c3ec6a
commit e058f3641c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 342 additions and 319 deletions

View File

@ -28,18 +28,17 @@ All metrics are attempted to be cast to integers, then booleans, then strings.
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## If false, skip chain & host verification ## If false, skip chain & host verification
# insecure_skip_verify = true # insecure_skip_verify = true
# Feature Options # Feature Options
# Add namespace variable to limit the namespaces executed on # Add namespace variable to limit the namespaces executed on
# Leave blank to do all # Leave blank to do all
# disable_query_namespaces = true # default false # disable_query_namespaces = true # default false
# namespaces = ["namespace1", "namespace2"] # namespaces = ["namespace1", "namespace2"]
# Enable set level telmetry # Enable set level telemetry
# query_sets = true # default: false # query_sets = true # default: false
# Add namespace set combinations to limit sets executed on # Add namespace set combinations to limit sets executed on
# Leave blank to do all # Leave blank to do all sets
# sets = ["namespace1/set1", "namespace1/set2"]
# sets = ["namespace1/set1", "namespace1/set2", "namespace3"] # sets = ["namespace1/set1", "namespace1/set2", "namespace3"]
# Histograms # Histograms
@ -48,12 +47,10 @@ All metrics are attempted to be cast to integers, then booleans, then strings.
# by default, aerospike produces a 100 bucket histogram # by default, aerospike produces a 100 bucket histogram
# this is not great for most graphing tools, this will allow # this is not great for most graphing tools, this will allow
# the ability to squash this to a smaller number of buckets # the ability to squash this to a smaller number of buckets
# To have a balanced histogram, the number of buckets chosen # To have a balanced histogram, the number of buckets chosen
# should divide evenly into 100. # should divide evenly into 100.
# num_histogram_buckets = 100 # default: 10 # num_histogram_buckets = 100 # default: 10
``` ```
### Measurements: ### Measurements:

View File

@ -10,11 +10,11 @@ import (
"sync" "sync"
"time" "time"
as "github.com/aerospike/aerospike-client-go"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/plugins/common/tls" tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
as "github.com/aerospike/aerospike-client-go"
) )
type Aerospike struct { type Aerospike struct {
@ -65,7 +65,7 @@ var sampleConfig = `
# disable_query_namespaces = true # default false # disable_query_namespaces = true # default false
# namespaces = ["namespace1", "namespace2"] # namespaces = ["namespace1", "namespace2"]
# Enable set level telmetry # Enable set level telemetry
# query_sets = true # default: false # query_sets = true # default: false
# Add namespace set combinations to limit sets executed on # Add namespace set combinations to limit sets executed on
# Leave blank to do all sets # Leave blank to do all sets
@ -77,7 +77,9 @@ var sampleConfig = `
# by default, aerospike produces a 100 bucket histogram # by default, aerospike produces a 100 bucket histogram
# this is not great for most graphing tools, this will allow # this is not great for most graphing tools, this will allow
# the ability to squash this to a smaller number of buckets # the ability to squash this to a smaller number of buckets
# To have a balanced histogram, the number of buckets chosen
# should divide evenly into 100.
# num_histogram_buckets = 100 # default: 10 # num_histogram_buckets = 100 # default: 10
` `
@ -119,7 +121,7 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
} }
if len(a.Servers) == 0 { if len(a.Servers) == 0 {
return a.gatherServer("127.0.0.1:3000", acc) return a.gatherServer(acc, "127.0.0.1:3000")
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -127,7 +129,7 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
for _, server := range a.Servers { for _, server := range a.Servers {
go func(serv string) { go func(serv string) {
defer wg.Done() defer wg.Done()
acc.AddError(a.gatherServer(serv, acc)) acc.AddError(a.gatherServer(acc, serv))
}(server) }(server)
} }
@ -135,7 +137,7 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) error { func (a *Aerospike) gatherServer(acc telegraf.Accumulator, hostPort string) error {
host, port, err := net.SplitHostPort(hostPort) host, port, err := net.SplitHostPort(hostPort)
if err != nil { if err != nil {
return err return err
@ -162,7 +164,7 @@ func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) erro
if err != nil { if err != nil {
return err return err
} }
a.parseNodeInfo(stats, hostPort, n.GetName(), acc) a.parseNodeInfo(acc, stats, hostPort, n.GetName())
namespaces, err := a.getNamespaces(n) namespaces, err := a.getNamespaces(n)
if err != nil { if err != nil {
@ -176,18 +178,17 @@ func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) erro
if err != nil { if err != nil {
continue continue
} else {
a.parseNamespaceInfo(stats, hostPort, namespace, n.GetName(), acc)
} }
a.parseNamespaceInfo(acc, stats, hostPort, namespace, n.GetName())
if a.EnableTTLHistogram { if a.EnableTTLHistogram {
err = a.getTTLHistogram(hostPort, namespace, "", n, acc) err = a.getTTLHistogram(acc, hostPort, namespace, "", n)
if err != nil { if err != nil {
continue continue
} }
} }
if a.EnableObjectSizeLinearHistogram { if a.EnableObjectSizeLinearHistogram {
err = a.getObjectSizeLinearHistogram(hostPort, namespace, "", n, acc) err = a.getObjectSizeLinearHistogram(acc, hostPort, namespace, "", n)
if err != nil { if err != nil {
continue continue
} }
@ -200,24 +201,22 @@ func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) erro
if err == nil { if err == nil {
for _, namespaceSet := range namespaceSets { for _, namespaceSet := range namespaceSets {
namespace, set := splitNamespaceSet(namespaceSet) namespace, set := splitNamespaceSet(namespaceSet)
stats, err := a.getSetInfo(namespaceSet, n) stats, err := a.getSetInfo(namespaceSet, n)
if err != nil { if err != nil {
continue continue
} else {
a.parseSetInfo(stats, hostPort, namespaceSet, n.GetName(), acc)
} }
a.parseSetInfo(acc, stats, hostPort, namespaceSet, n.GetName())
if a.EnableTTLHistogram { if a.EnableTTLHistogram {
err = a.getTTLHistogram(hostPort, namespace, set, n, acc) err = a.getTTLHistogram(acc, hostPort, namespace, set, n)
if err != nil { if err != nil {
continue continue
} }
} }
if a.EnableObjectSizeLinearHistogram { if a.EnableObjectSizeLinearHistogram {
err = a.getObjectSizeLinearHistogram(hostPort, namespace, set, n, acc) err = a.getObjectSizeLinearHistogram(acc, hostPort, namespace, set, n)
if err != nil { if err != nil {
continue continue
} }
@ -238,7 +237,7 @@ func (a *Aerospike) getNodeInfo(n *as.Node) (map[string]string, error) {
return stats, nil return stats, nil
} }
func (a *Aerospike) parseNodeInfo(stats map[string]string, hostPort string, nodeName string, acc telegraf.Accumulator) { func (a *Aerospike) parseNodeInfo(acc telegraf.Accumulator, stats map[string]string, hostPort string, nodeName string) {
tags := map[string]string{ tags := map[string]string{
"aerospike_host": hostPort, "aerospike_host": hostPort,
"node_name": nodeName, "node_name": nodeName,
@ -275,7 +274,7 @@ func (a *Aerospike) getNamespaceInfo(namespace string, n *as.Node) (map[string]s
return stats, err return stats, err
} }
func (a *Aerospike) parseNamespaceInfo(stats map[string]string, hostPort string, namespace string, nodeName string, acc telegraf.Accumulator) { func (a *Aerospike) parseNamespaceInfo(acc telegraf.Accumulator, stats map[string]string, hostPort string, namespace string, nodeName string) {
nTags := map[string]string{ nTags := map[string]string{
"aerospike_host": hostPort, "aerospike_host": hostPort,
"node_name": nodeName, "node_name": nodeName,
@ -341,7 +340,7 @@ func (a *Aerospike) getSetInfo(namespaceSet string, n *as.Node) (map[string]stri
return stats, nil return stats, nil
} }
func (a *Aerospike) parseSetInfo(stats map[string]string, hostPort string, namespaceSet string, nodeName string, acc telegraf.Accumulator) { func (a *Aerospike) parseSetInfo(acc telegraf.Accumulator, stats map[string]string, hostPort string, namespaceSet string, nodeName string) {
stat := strings.Split( stat := strings.Split(
strings.TrimSuffix( strings.TrimSuffix(
stats[fmt.Sprintf("sets/%s", namespaceSet)], ";"), ":") stats[fmt.Sprintf("sets/%s", namespaceSet)], ";"), ":")
@ -363,22 +362,26 @@ func (a *Aerospike) parseSetInfo(stats map[string]string, hostPort string, names
acc.AddFields("aerospike_set", nFields, nTags, time.Now()) acc.AddFields("aerospike_set", nFields, nTags, time.Now())
} }
func (a *Aerospike) getTTLHistogram(hostPort string, namespace string, set string, n *as.Node, acc telegraf.Accumulator) error { func (a *Aerospike) getTTLHistogram(acc telegraf.Accumulator, hostPort string, namespace string, set string, n *as.Node) error {
stats, err := a.getHistogram(namespace, set, "ttl", n) stats, err := a.getHistogram(namespace, set, "ttl", n)
if err != nil { if err != nil {
return err return err
} }
a.parseHistogram(stats, hostPort, namespace, set, "ttl", n.GetName(), acc)
nTags := createTags(hostPort, n.GetName(), namespace, set)
a.parseHistogram(acc, stats, nTags, "ttl")
return nil return nil
} }
func (a *Aerospike) getObjectSizeLinearHistogram(hostPort string, namespace string, set string, n *as.Node, acc telegraf.Accumulator) error { func (a *Aerospike) getObjectSizeLinearHistogram(acc telegraf.Accumulator, hostPort string, namespace string, set string, n *as.Node) error {
stats, err := a.getHistogram(namespace, set, "object-size-linear", n) stats, err := a.getHistogram(namespace, set, "object-size-linear", n)
if err != nil { if err != nil {
return err return err
} }
a.parseHistogram(stats, hostPort, namespace, set, "object-size-linear", n.GetName(), acc)
nTags := createTags(hostPort, n.GetName(), namespace, set)
a.parseHistogram(acc, stats, nTags, "object-size-linear")
return nil return nil
} }
@ -398,17 +401,7 @@ func (a *Aerospike) getHistogram(namespace string, set string, histogramType str
return stats, nil return stats, nil
} }
func (a *Aerospike) parseHistogram(stats map[string]string, hostPort string, namespace string, set string, histogramType string, nodeName string, acc telegraf.Accumulator) { func (a *Aerospike) parseHistogram(acc telegraf.Accumulator, stats map[string]string, nTags map[string]string, histogramType string) {
nTags := map[string]string{
"aerospike_host": hostPort,
"node_name": nodeName,
"namespace": namespace,
}
if len(set) > 0 {
nTags["set"] = set
}
nFields := make(map[string]interface{}) nFields := make(map[string]interface{})
for _, stat := range stats { for _, stat := range stats {
@ -421,7 +414,7 @@ func (a *Aerospike) parseHistogram(stats map[string]string, hostPort string, nam
if pieces[0] == "buckets" { if pieces[0] == "buckets" {
buckets := strings.Split(pieces[1], ",") buckets := strings.Split(pieces[1], ",")
// Normalize incase of less buckets than expected // Normalize in case of less buckets than expected
numRecordsPerBucket := 1 numRecordsPerBucket := 1
if len(buckets) > a.NumberHistogramBuckets { if len(buckets) > a.NumberHistogramBuckets {
numRecordsPerBucket = int(math.Ceil(float64(len(buckets)) / float64(a.NumberHistogramBuckets))) numRecordsPerBucket = int(math.Ceil(float64(len(buckets)) / float64(a.NumberHistogramBuckets)))
@ -458,7 +451,7 @@ func (a *Aerospike) parseHistogram(stats map[string]string, hostPort string, nam
acc.AddFields(fmt.Sprintf("aerospike_histogram_%v", strings.Replace(histogramType, "-", "_", -1)), nFields, nTags, time.Now()) acc.AddFields(fmt.Sprintf("aerospike_histogram_%v", strings.Replace(histogramType, "-", "_", -1)), nFields, nTags, time.Now())
} }
func splitNamespaceSet(namespaceSet string) (string, string) { func splitNamespaceSet(namespaceSet string) (namespace string, set string) {
split := strings.Split(namespaceSet, "/") split := strings.Split(namespaceSet, "/")
return split[0], split[1] return split[0], split[1]
} }
@ -478,6 +471,19 @@ func parseAerospikeValue(key string, v string) interface{} {
} }
} }
func createTags(hostPort string, nodeName string, namespace string, set string) map[string]string {
nTags := map[string]string{
"aerospike_host": hostPort,
"node_name": nodeName,
"namespace": namespace,
}
if len(set) > 0 {
nTags["set"] = set
}
return nTags
}
func init() { func init() {
inputs.Add("aerospike", func() telegraf.Input { inputs.Add("aerospike", func() telegraf.Input {
return &Aerospike{} return &Aerospike{}

View File

@ -4,9 +4,9 @@ import (
"testing" "testing"
as "github.com/aerospike/aerospike-client-go" as "github.com/aerospike/aerospike-client-go"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
func TestAerospikeStatisticsIntegration(t *testing.T) { func TestAerospikeStatisticsIntegration(t *testing.T) {
@ -23,14 +23,14 @@ func TestAerospikeStatisticsIntegration(t *testing.T) {
err := acc.GatherError(a.Gather) err := acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasMeasurement("aerospike_node")) require.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasTag("aerospike_node", "node_name")) require.True(t, acc.HasTag("aerospike_node", "node_name"))
assert.True(t, acc.HasMeasurement("aerospike_namespace")) require.True(t, acc.HasMeasurement("aerospike_namespace"))
assert.True(t, acc.HasTag("aerospike_namespace", "node_name")) require.True(t, acc.HasTag("aerospike_namespace", "node_name"))
assert.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error")) require.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error"))
namespaceName := acc.TagValue("aerospike_namespace", "namespace") namespaceName := acc.TagValue("aerospike_namespace", "namespace")
assert.Equal(t, namespaceName, "test") require.Equal(t, "test", namespaceName)
} }
func TestAerospikeStatisticsPartialErrIntegration(t *testing.T) { func TestAerospikeStatisticsPartialErrIntegration(t *testing.T) {
@ -50,14 +50,14 @@ func TestAerospikeStatisticsPartialErrIntegration(t *testing.T) {
require.Error(t, err) require.Error(t, err)
assert.True(t, acc.HasMeasurement("aerospike_node")) require.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasMeasurement("aerospike_namespace")) require.True(t, acc.HasMeasurement("aerospike_namespace"))
assert.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error")) require.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error"))
namespaceName := acc.TagSetValue("aerospike_namespace", "namespace") namespaceName := acc.TagSetValue("aerospike_namespace", "namespace")
assert.Equal(t, namespaceName, "test") require.Equal(t, "test", namespaceName)
} }
func TestSelectNamepsacesIntegration(t *testing.T) { func TestSelectNamespacesIntegration(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("Skipping aerospike integration tests.") t.Skip("Skipping aerospike integration tests.")
} }
@ -73,10 +73,10 @@ func TestSelectNamepsacesIntegration(t *testing.T) {
err := acc.GatherError(a.Gather) err := acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasMeasurement("aerospike_node")) require.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasTag("aerospike_node", "node_name")) require.True(t, acc.HasTag("aerospike_node", "node_name"))
assert.True(t, acc.HasMeasurement("aerospike_namespace")) require.True(t, acc.HasMeasurement("aerospike_namespace"))
assert.True(t, acc.HasTag("aerospike_namespace", "node_name")) require.True(t, acc.HasTag("aerospike_namespace", "node_name"))
// Expect only 1 namespace // Expect only 1 namespace
count := 0 count := 0
@ -85,10 +85,10 @@ func TestSelectNamepsacesIntegration(t *testing.T) {
count++ count++
} }
} }
assert.Equal(t, count, 1) require.Equal(t, 1, count)
// expect namespace to have no fields as nonexistent // expect namespace to have no fields as nonexistent
assert.False(t, acc.HasInt64Field("aerospke_namespace", "appeals_tx_remaining")) require.False(t, acc.HasInt64Field("aerospke_namespace", "appeals_tx_remaining"))
} }
func TestDisableQueryNamespacesIntegration(t *testing.T) { func TestDisableQueryNamespacesIntegration(t *testing.T) {
@ -107,15 +107,15 @@ func TestDisableQueryNamespacesIntegration(t *testing.T) {
err := acc.GatherError(a.Gather) err := acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasMeasurement("aerospike_node")) require.True(t, acc.HasMeasurement("aerospike_node"))
assert.False(t, acc.HasMeasurement("aerospike_namespace")) require.False(t, acc.HasMeasurement("aerospike_namespace"))
a.DisableQueryNamespaces = false a.DisableQueryNamespaces = false
err = acc.GatherError(a.Gather) err = acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasMeasurement("aerospike_node")) require.True(t, acc.HasMeasurement("aerospike_node"))
assert.True(t, acc.HasMeasurement("aerospike_namespace")) require.True(t, acc.HasMeasurement("aerospike_namespace"))
} }
func TestQuerySetsIntegration(t *testing.T) { func TestQuerySetsIntegration(t *testing.T) {
@ -127,6 +127,7 @@ func TestQuerySetsIntegration(t *testing.T) {
// test is the default namespace from aerospike // test is the default namespace from aerospike
policy := as.NewClientPolicy() policy := as.NewClientPolicy()
client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000) client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000)
require.NoError(t, err)
key, err := as.NewKey("test", "foo", 123) key, err := as.NewKey("test", "foo", 123)
require.NoError(t, err) require.NoError(t, err)
@ -158,12 +159,12 @@ func TestQuerySetsIntegration(t *testing.T) {
err = acc.GatherError(a.Gather) err = acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo")) require.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo"))
assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar")) require.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar"))
assert.True(t, acc.HasMeasurement("aerospike_set")) require.True(t, acc.HasMeasurement("aerospike_set"))
assert.True(t, acc.HasTag("aerospike_set", "set")) require.True(t, acc.HasTag("aerospike_set", "set"))
assert.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes")) require.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes"))
} }
func TestSelectQuerySetsIntegration(t *testing.T) { func TestSelectQuerySetsIntegration(t *testing.T) {
@ -175,6 +176,7 @@ func TestSelectQuerySetsIntegration(t *testing.T) {
// test is the default namespace from aerospike // test is the default namespace from aerospike
policy := as.NewClientPolicy() policy := as.NewClientPolicy()
client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000) client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000)
require.NoError(t, err)
key, err := as.NewKey("test", "foo", 123) key, err := as.NewKey("test", "foo", 123)
require.NoError(t, err) require.NoError(t, err)
@ -207,12 +209,12 @@ func TestSelectQuerySetsIntegration(t *testing.T) {
err = acc.GatherError(a.Gather) err = acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo")) require.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo"))
assert.False(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar")) require.False(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar"))
assert.True(t, acc.HasMeasurement("aerospike_set")) require.True(t, acc.HasMeasurement("aerospike_set"))
assert.True(t, acc.HasTag("aerospike_set", "set")) require.True(t, acc.HasTag("aerospike_set", "set"))
assert.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes")) require.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes"))
} }
func TestDisableTTLHistogramIntegration(t *testing.T) { func TestDisableTTLHistogramIntegration(t *testing.T) {
@ -233,7 +235,7 @@ func TestDisableTTLHistogramIntegration(t *testing.T) {
err := acc.GatherError(a.Gather) err := acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.False(t, acc.HasMeasurement("aerospike_histogram_ttl")) require.False(t, acc.HasMeasurement("aerospike_histogram_ttl"))
} }
func TestTTLHistogramIntegration(t *testing.T) { func TestTTLHistogramIntegration(t *testing.T) {
if testing.Short() { if testing.Short() {
@ -250,7 +252,7 @@ func TestTTLHistogramIntegration(t *testing.T) {
} }
/* /*
Produces histogram Produces histogram
Measurment exists Measurement exists
Has appropriate tags (node name etc) Has appropriate tags (node name etc)
Has appropriate keys (time:value) Has appropriate keys (time:value)
may be able to leverage histogram plugin may be able to leverage histogram plugin
@ -259,8 +261,8 @@ func TestTTLHistogramIntegration(t *testing.T) {
err := acc.GatherError(a.Gather) err := acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasMeasurement("aerospike_histogram_ttl")) require.True(t, acc.HasMeasurement("aerospike_histogram_ttl"))
assert.True(t, FindTagValue(&acc, "aerospike_histogram_ttl", "namespace", "test")) require.True(t, FindTagValue(&acc, "aerospike_histogram_ttl", "namespace", "test"))
} }
func TestDisableObjectSizeLinearHistogramIntegration(t *testing.T) { func TestDisableObjectSizeLinearHistogramIntegration(t *testing.T) {
if testing.Short() { if testing.Short() {
@ -280,7 +282,7 @@ func TestDisableObjectSizeLinearHistogramIntegration(t *testing.T) {
err := acc.GatherError(a.Gather) err := acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.False(t, acc.HasMeasurement("aerospike_histogram_object_size_linear")) require.False(t, acc.HasMeasurement("aerospike_histogram_object_size_linear"))
} }
func TestObjectSizeLinearHistogramIntegration(t *testing.T) { func TestObjectSizeLinearHistogramIntegration(t *testing.T) {
if testing.Short() { if testing.Short() {
@ -297,7 +299,7 @@ func TestObjectSizeLinearHistogramIntegration(t *testing.T) {
} }
/* /*
Produces histogram Produces histogram
Measurment exists Measurement exists
Has appropriate tags (node name etc) Has appropriate tags (node name etc)
Has appropriate keys (time:value) Has appropriate keys (time:value)
@ -305,8 +307,8 @@ func TestObjectSizeLinearHistogramIntegration(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(a.Gather) err := acc.GatherError(a.Gather)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, acc.HasMeasurement("aerospike_histogram_object_size_linear")) require.True(t, acc.HasMeasurement("aerospike_histogram_object_size_linear"))
assert.True(t, FindTagValue(&acc, "aerospike_histogram_object_size_linear", "namespace", "test")) require.True(t, FindTagValue(&acc, "aerospike_histogram_object_size_linear", "namespace", "test"))
} }
func TestParseNodeInfo(t *testing.T) { func TestParseNodeInfo(t *testing.T) {
@ -330,7 +332,7 @@ func TestParseNodeInfo(t *testing.T) {
"node_name": "TestNodeName", "node_name": "TestNodeName",
} }
a.parseNodeInfo(stats, "127.0.0.1:3000", "TestNodeName", &acc) a.parseNodeInfo(&acc, stats, "127.0.0.1:3000", "TestNodeName")
acc.AssertContainsTaggedFields(t, "aerospike_node", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "aerospike_node", expectedFields, expectedTags)
} }
@ -356,7 +358,7 @@ func TestParseNamespaceInfo(t *testing.T) {
"namespace": "test", "namespace": "test",
} }
a.parseNamespaceInfo(stats, "127.0.0.1:3000", "test", "TestNodeName", &acc) a.parseNamespaceInfo(&acc, stats, "127.0.0.1:3000", "test", "TestNodeName")
acc.AssertContainsTaggedFields(t, "aerospike_namespace", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "aerospike_namespace", expectedFields, expectedTags)
} }
@ -380,7 +382,7 @@ func TestParseSetInfo(t *testing.T) {
"node_name": "TestNodeName", "node_name": "TestNodeName",
"set": "test/foo", "set": "test/foo",
} }
a.parseSetInfo(stats, "127.0.0.1:3000", "test/foo", "TestNodeName", &acc) a.parseSetInfo(&acc, stats, "127.0.0.1:3000", "test/foo", "TestNodeName")
acc.AssertContainsTaggedFields(t, "aerospike_set", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "aerospike_set", expectedFields, expectedTags)
} }
@ -412,7 +414,8 @@ func TestParseHistogramSet(t *testing.T) {
"set": "foo", "set": "foo",
} }
a.parseHistogram(stats, "127.0.0.1:3000", "test", "foo", "object-size-linear", "TestNodeName", &acc) nTags := createTags("127.0.0.1:3000", "TestNodeName", "test", "foo")
a.parseHistogram(&acc, stats, nTags, "object-size-linear")
acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags)
} }
func TestParseHistogramNamespace(t *testing.T) { func TestParseHistogramNamespace(t *testing.T) {
@ -442,16 +445,17 @@ func TestParseHistogramNamespace(t *testing.T) {
"namespace": "test", "namespace": "test",
} }
a.parseHistogram(stats, "127.0.0.1:3000", "test", "", "object-size-linear", "TestNodeName", &acc) nTags := createTags("127.0.0.1:3000", "TestNodeName", "test", "")
a.parseHistogram(&acc, stats, nTags, "object-size-linear")
acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags)
} }
func TestAerospikeParseValue(t *testing.T) { func TestAerospikeParseValue(t *testing.T) {
// uint64 with value bigger than int64 max // uint64 with value bigger than int64 max
val := parseAerospikeValue("", "18446744041841121751") val := parseAerospikeValue("", "18446744041841121751")
require.Equal(t, val, uint64(18446744041841121751)) require.Equal(t, uint64(18446744041841121751), val)
val = parseAerospikeValue("", "true") val = parseAerospikeValue("", "true")
require.Equal(t, val, true) require.Equal(t, true, val)
// int values // int values
val = parseAerospikeValue("", "42") val = parseAerospikeValue("", "42")

View File

@ -11,13 +11,14 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/sdk" "github.com/aliyun/alibaba-cloud-sdk-go/sdk"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers"
"github.com/aliyun/alibaba-cloud-sdk-go/services/cms" "github.com/aliyun/alibaba-cloud-sdk-go/services/cms"
"github.com/jmespath/go-jmespath"
"github.com/pkg/errors"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/jmespath/go-jmespath"
"github.com/pkg/errors"
) )
const ( const (
@ -161,7 +162,7 @@ type (
dtLock sync.Mutex //Guard for discoveryTags & dimensions dtLock sync.Mutex //Guard for discoveryTags & dimensions
discoveryTags map[string]map[string]string //Internal data structure that can enrich metrics with tags discoveryTags map[string]map[string]string //Internal data structure that can enrich metrics with tags
dimensionsUdObj map[string]string dimensionsUdObj map[string]string
dimensionsUdArr []map[string]string //Parsed Dimesnsions JSON string (unmarshalled) dimensionsUdArr []map[string]string //Parsed Dimensions JSON string (unmarshalled)
requestDimensions []map[string]string //this is the actual dimensions list that would be used in API request requestDimensions []map[string]string //this is the actual dimensions list that would be used in API request
requestDimensionsStr string //String representation of the above requestDimensionsStr string //String representation of the above
@ -239,7 +240,7 @@ func (s *AliyunCMS) Init() error {
//Init discovery... //Init discovery...
if s.dt == nil { //Support for tests if s.dt == nil { //Support for tests
s.dt, err = NewDiscoveryTool(s.DiscoveryRegions, s.Project, s.Log, credential, int(float32(s.RateLimit)*0.2), time.Duration(s.DiscoveryInterval)) s.dt, err = newDiscoveryTool(s.DiscoveryRegions, s.Project, s.Log, credential, int(float32(s.RateLimit)*0.2), time.Duration(s.DiscoveryInterval))
if err != nil { if err != nil {
s.Log.Errorf("Discovery tool is not activated: %v", err) s.Log.Errorf("Discovery tool is not activated: %v", err)
s.dt = nil s.dt = nil
@ -395,8 +396,8 @@ func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, me
} }
//Tag helper //Tag helper
func parseTag(tagSpec string, data interface{}) (string, string, error) { func parseTag(tagSpec string, data interface{}) (tagKey string, tagValue string, err error) {
tagKey := tagSpec tagKey = tagSpec
queryPath := tagSpec queryPath := tagSpec
//Split query path to tagKey and query path //Split query path to tagKey and query path

View File

@ -12,12 +12,13 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses"
"github.com/aliyun/alibaba-cloud-sdk-go/services/cms" "github.com/aliyun/alibaba-cloud-sdk-go/services/cms"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
) )
const inputTitle = "inputs.aliyuncms" const inputTitle = "inputs.aliyuncms"
@ -95,7 +96,7 @@ func getDiscoveryTool(project string, discoverRegions []string) (*discoveryTool,
return nil, errors.Errorf("failed to retrieve credential: %v", err) return nil, errors.Errorf("failed to retrieve credential: %v", err)
} }
dt, err := NewDiscoveryTool(discoverRegions, project, testutil.Logger{Name: inputTitle}, credential, 1, time.Minute*2) dt, err := newDiscoveryTool(discoverRegions, project, testutil.Logger{Name: inputTitle}, credential, 1, time.Minute*2)
if err != nil { if err != nil {
return nil, errors.Errorf("Can't create discovery tool object: %v", err) return nil, errors.Errorf("Can't create discovery tool object: %v", err)

View File

@ -2,7 +2,6 @@ package aliyuncms
import ( import (
"encoding/json" "encoding/json"
"github.com/influxdata/telegraf"
"reflect" "reflect"
"regexp" "regexp"
"strconv" "strconv"
@ -17,8 +16,10 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/services/rds" "github.com/aliyun/alibaba-cloud-sdk-go/services/rds"
"github.com/aliyun/alibaba-cloud-sdk-go/services/slb" "github.com/aliyun/alibaba-cloud-sdk-go/services/slb"
"github.com/aliyun/alibaba-cloud-sdk-go/services/vpc" "github.com/aliyun/alibaba-cloud-sdk-go/services/vpc"
"github.com/influxdata/telegraf/internal/limiter"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/limiter"
) )
// https://www.alibabacloud.com/help/doc-detail/40654.htm?gclid=Cj0KCQjw4dr0BRCxARIsAKUNjWTAMfyVUn_Y3OevFBV3CMaazrhq0URHsgE7c0m0SeMQRKlhlsJGgIEaAviyEALw_wcB // https://www.alibabacloud.com/help/doc-detail/40654.htm?gclid=Cj0KCQjw4dr0BRCxARIsAKUNjWTAMfyVUn_Y3OevFBV3CMaazrhq0URHsgE7c0m0SeMQRKlhlsJGgIEaAviyEALw_wcB
@ -69,6 +70,13 @@ type discoveryTool struct {
lg telegraf.Logger //Telegraf logger (should be provided) lg telegraf.Logger //Telegraf logger (should be provided)
} }
type response struct {
discData []interface{}
totalCount int
pageSize int
pageNumber int
}
//getRPCReqFromDiscoveryRequest - utility function to map between aliyun request primitives //getRPCReqFromDiscoveryRequest - utility function to map between aliyun request primitives
//discoveryRequest represents different type of discovery requests //discoveryRequest represents different type of discovery requests
func getRPCReqFromDiscoveryRequest(req discoveryRequest) (*requests.RpcRequest, error) { func getRPCReqFromDiscoveryRequest(req discoveryRequest) (*requests.RpcRequest, error) {
@ -97,13 +105,13 @@ func getRPCReqFromDiscoveryRequest(req discoveryRequest) (*requests.RpcRequest,
return nil, errors.Errorf("Didn't find *requests.RpcRequest embedded struct in %q", ptrV.Type()) return nil, errors.Errorf("Didn't find *requests.RpcRequest embedded struct in %q", ptrV.Type())
} }
//NewDiscoveryTool function returns discovery tool object. //newDiscoveryTool function returns discovery tool object.
//The object is used to periodically get data about aliyun objects and send this //The object is used to periodically get data about aliyun objects and send this
//data into channel. The intention is to enrich reported metrics with discovery data. //data into channel. The intention is to enrich reported metrics with discovery data.
//Discovery is supported for a limited set of object types (defined by project) and can be extended in future. //Discovery is supported for a limited set of object types (defined by project) and can be extended in future.
//Discovery can be limited by region if not set, then all regions is queried. //Discovery can be limited by region if not set, then all regions is queried.
//Request against API can inquire additional costs, consult with aliyun API documentation. //Request against API can inquire additional costs, consult with aliyun API documentation.
func NewDiscoveryTool(regions []string, project string, lg telegraf.Logger, credential auth.Credential, rateLimit int, discoveryInterval time.Duration) (*discoveryTool, error) { func newDiscoveryTool(regions []string, project string, lg telegraf.Logger, credential auth.Credential, rateLimit int, discoveryInterval time.Duration) (*discoveryTool, error) {
var ( var (
dscReq = map[string]discoveryRequest{} dscReq = map[string]discoveryRequest{}
cli = map[string]aliyunSdkClient{} cli = map[string]aliyunSdkClient{}
@ -292,22 +300,22 @@ func NewDiscoveryTool(regions []string, project string, lg telegraf.Logger, cred
}, nil }, nil
} }
func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (discData []interface{}, totalCount int, pageSize int, pageNumber int, err error) { func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (discoveryResponse *response, err error) {
var ( var (
fullOutput = map[string]interface{}{} fullOutput = map[string]interface{}{}
data []byte foundDataItem, foundRootKey bool
foundDataItem bool discData []interface{}
foundRootKey bool totalCount, pageSize, pageNumber int
) )
data = resp.GetHttpContentBytes() data := resp.GetHttpContentBytes()
if data == nil { //No data if data == nil { //No data
return nil, 0, 0, 0, errors.Errorf("No data in response to be parsed") return nil, errors.Errorf("No data in response to be parsed")
} }
err = json.Unmarshal(data, &fullOutput) err = json.Unmarshal(data, &fullOutput)
if err != nil { if err != nil {
return nil, 0, 0, 0, errors.Errorf("Can't parse JSON from discovery response: %v", err) return nil, errors.Errorf("Can't parse JSON from discovery response: %v", err)
} }
for key, val := range fullOutput { for key, val := range fullOutput {
@ -316,7 +324,7 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse)
foundRootKey = true foundRootKey = true
rootKeyVal, ok := val.(map[string]interface{}) rootKeyVal, ok := val.(map[string]interface{})
if !ok { if !ok {
return nil, 0, 0, 0, errors.Errorf("Content of root key %q, is not an object: %v", key, val) return nil, errors.Errorf("Content of root key %q, is not an object: %v", key, val)
} }
//It should contain the array with discovered data //It should contain the array with discovered data
@ -326,7 +334,7 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse)
} }
} }
if !foundDataItem { if !foundDataItem {
return nil, 0, 0, 0, errors.Errorf("Didn't find array item in root key %q", key) return nil, errors.Errorf("Didn't find array item in root key %q", key)
} }
case "TotalCount": case "TotalCount":
totalCount = int(val.(float64)) totalCount = int(val.(float64))
@ -337,55 +345,54 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse)
} }
} }
if !foundRootKey { if !foundRootKey {
return nil, 0, 0, 0, errors.Errorf("Didn't find root key %q in discovery response", dt.respRootKey) return nil, errors.Errorf("Didn't find root key %q in discovery response", dt.respRootKey)
} }
return return &response{
discData: discData,
totalCount: totalCount,
pageSize: pageSize,
pageNumber: pageNumber,
}, nil
} }
func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, limiter chan bool) (map[string]interface{}, error) { func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, limiterChan chan bool) (map[string]interface{}, error) {
var ( var discoveryData []interface{}
err error
resp *responses.CommonResponse
data []interface{}
discoveryData []interface{}
totalCount int
pageNumber int
)
defer delete(req.QueryParams, "PageNumber") defer delete(req.QueryParams, "PageNumber")
for { for {
if limiter != nil { if limiterChan != nil {
<-limiter //Rate limiting <-limiterChan //Rate limiting
} }
resp, err = cli.ProcessCommonRequest(req) resp, err := cli.ProcessCommonRequest(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, totalCount, _, pageNumber, err = dt.parseDiscoveryResponse(resp) discoveryResponse, err := dt.parseDiscoveryResponse(resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
discoveryData = append(discoveryData, data...) discoveryData = append(discoveryData, discoveryResponse.discData...)
//Pagination //Pagination
pageNumber++ discoveryResponse.pageNumber++
req.QueryParams["PageNumber"] = strconv.Itoa(pageNumber) req.QueryParams["PageNumber"] = strconv.Itoa(discoveryResponse.pageNumber)
if len(discoveryData) == totalCount { //All data received if len(discoveryData) == discoveryResponse.totalCount { //All data received
//Map data to appropriate shape before return //Map data to appropriate shape before return
preparedData := map[string]interface{}{} preparedData := map[string]interface{}{}
for _, raw := range discoveryData { for _, raw := range discoveryData {
if elem, ok := raw.(map[string]interface{}); ok { elem, ok := raw.(map[string]interface{})
if objectID, ok := elem[dt.respObjectIDKey].(string); ok { if !ok {
preparedData[objectID] = elem
}
} else {
return nil, errors.Errorf("Can't parse input data element, not a map[string]interface{} type") return nil, errors.Errorf("Can't parse input data element, not a map[string]interface{} type")
} }
if objectID, ok := elem[dt.respObjectIDKey].(string); ok {
preparedData[objectID] = elem
}
} }
return preparedData, nil return preparedData, nil
@ -393,7 +400,7 @@ func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.Com
} }
} }
func (dt *discoveryTool) getDiscoveryDataAllRegions(limiter chan bool) (map[string]interface{}, error) { func (dt *discoveryTool) getDiscoveryDataAllRegions(limiterChan chan bool) (map[string]interface{}, error) {
var ( var (
data map[string]interface{} data map[string]interface{}
resultData = map[string]interface{}{} resultData = map[string]interface{}{}
@ -424,7 +431,7 @@ func (dt *discoveryTool) getDiscoveryDataAllRegions(limiter chan bool) (map[stri
commonRequest.TransToAcsRequest() commonRequest.TransToAcsRequest()
//Get discovery data using common request //Get discovery data using common request
data, err = dt.getDiscoveryData(cli, commonRequest, limiter) data, err = dt.getDiscoveryData(cli, commonRequest, limiterChan)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -43,7 +43,7 @@ The following defaults are known to work with RabbitMQ:
# exchange_arguments = { } # exchange_arguments = { }
# exchange_arguments = {"hash_property" = "timestamp"} # exchange_arguments = {"hash_property" = "timestamp"}
## AMQP queue name ## AMQP queue name.
queue = "telegraf" queue = "telegraf"
## AMQP queue durability can be "transient" or "durable". ## AMQP queue durability can be "transient" or "durable".

View File

@ -9,12 +9,13 @@ import (
"sync" "sync"
"time" "time"
"github.com/streadway/amqp"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/streadway/amqp"
) )
const ( const (
@ -183,7 +184,7 @@ func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
// make new tls config // make new tls config
tls, err := a.ClientConfig.TLSConfig() tlsCfg, err := a.ClientConfig.TLSConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -201,7 +202,7 @@ func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
} }
config := amqp.Config{ config := amqp.Config{
TLSClientConfig: tls, TLSClientConfig: tlsCfg,
SASL: auth, // if nil, it will be PLAIN SASL: auth, // if nil, it will be PLAIN
} }
return &config, nil return &config, nil
@ -292,12 +293,9 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
} }
if a.Exchange != "" { if a.Exchange != "" {
var exchangeDurable = true exchangeDurable := true
switch a.ExchangeDurability { if a.ExchangeDurability == "transient" {
case "transient":
exchangeDurable = false exchangeDurable = false
default:
exchangeDurable = true
} }
exchangeArgs := make(amqp.Table, len(a.ExchangeArguments)) exchangeArgs := make(amqp.Table, len(a.ExchangeArguments))
@ -305,11 +303,8 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
exchangeArgs[k] = v exchangeArgs[k] = v
} }
err = declareExchange( err = a.declareExchange(
ch, ch,
a.Exchange,
a.ExchangeType,
a.ExchangePassive,
exchangeDurable, exchangeDurable,
exchangeArgs) exchangeArgs)
if err != nil { if err != nil {
@ -317,11 +312,7 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
} }
} }
q, err := declareQueue( q, err := a.declareQueue(ch)
ch,
a.Queue,
a.QueueDurability,
a.QueuePassive)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -364,19 +355,16 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return msgs, err return msgs, err
} }
func declareExchange( func (a *AMQPConsumer) declareExchange(
channel *amqp.Channel, channel *amqp.Channel,
exchangeName string,
exchangeType string,
exchangePassive bool,
exchangeDurable bool, exchangeDurable bool,
exchangeArguments amqp.Table, exchangeArguments amqp.Table,
) error { ) error {
var err error var err error
if exchangePassive { if a.ExchangePassive {
err = channel.ExchangeDeclarePassive( err = channel.ExchangeDeclarePassive(
exchangeName, a.Exchange,
exchangeType, a.ExchangeType,
exchangeDurable, exchangeDurable,
false, // delete when unused false, // delete when unused
false, // internal false, // internal
@ -385,8 +373,8 @@ func declareExchange(
) )
} else { } else {
err = channel.ExchangeDeclare( err = channel.ExchangeDeclare(
exchangeName, a.Exchange,
exchangeType, a.ExchangeType,
exchangeDurable, exchangeDurable,
false, // delete when unused false, // delete when unused
false, // internal false, // internal
@ -400,26 +388,18 @@ func declareExchange(
return nil return nil
} }
func declareQueue( func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error) {
channel *amqp.Channel,
queueName string,
queueDurability string,
queuePassive bool,
) (*amqp.Queue, error) {
var queue amqp.Queue var queue amqp.Queue
var err error var err error
var queueDurable = true queueDurable := true
switch queueDurability { if a.QueueDurability == "transient" {
case "transient":
queueDurable = false queueDurable = false
default:
queueDurable = true
} }
if queuePassive { if a.QueuePassive {
queue, err = channel.QueueDeclarePassive( queue, err = channel.QueueDeclarePassive(
queueName, // queue a.Queue, // queue
queueDurable, // durable queueDurable, // durable
false, // delete when unused false, // delete when unused
false, // exclusive false, // exclusive
@ -428,7 +408,7 @@ func declareQueue(
) )
} else { } else {
queue, err = channel.QueueDeclare( queue, err = channel.QueueDeclare(
queueName, // queue a.Queue, // queue
queueDurable, // durable queueDurable, // durable
false, // delete when unused false, // delete when unused
false, // exclusive false, // exclusive

View File

@ -158,31 +158,31 @@ func (n *Apache) gatherURL(addr *url.URL, acc telegraf.Accumulator) error {
} }
func (n *Apache) gatherScores(data string) map[string]interface{} { func (n *Apache) gatherScores(data string) map[string]interface{} {
var waiting, open int = 0, 0 var waiting, open = 0, 0
var S, R, W, K, D, C, L, G, I int = 0, 0, 0, 0, 0, 0, 0, 0, 0 var s, r, w, k, d, c, l, g, i = 0, 0, 0, 0, 0, 0, 0, 0, 0
for _, s := range strings.Split(data, "") { for _, str := range strings.Split(data, "") {
switch s { switch str {
case "_": case "_":
waiting++ waiting++
case "S": case "S":
S++ s++
case "R": case "R":
R++ r++
case "W": case "W":
W++ w++
case "K": case "K":
K++ k++
case "D": case "D":
D++ d++
case "C": case "C":
C++ c++
case "L": case "L":
L++ l++
case "G": case "G":
G++ g++
case "I": case "I":
I++ i++
case ".": case ".":
open++ open++
} }
@ -190,15 +190,15 @@ func (n *Apache) gatherScores(data string) map[string]interface{} {
fields := map[string]interface{}{ fields := map[string]interface{}{
"scboard_waiting": float64(waiting), "scboard_waiting": float64(waiting),
"scboard_starting": float64(S), "scboard_starting": float64(s),
"scboard_reading": float64(R), "scboard_reading": float64(r),
"scboard_sending": float64(W), "scboard_sending": float64(w),
"scboard_keepalive": float64(K), "scboard_keepalive": float64(k),
"scboard_dnslookup": float64(D), "scboard_dnslookup": float64(d),
"scboard_closing": float64(C), "scboard_closing": float64(c),
"scboard_logging": float64(L), "scboard_logging": float64(l),
"scboard_finishing": float64(G), "scboard_finishing": float64(g),
"scboard_idle_cleanup": float64(I), "scboard_idle_cleanup": float64(i),
"scboard_open": float64(open), "scboard_open": float64(open),
} }
return fields return fields

View File

@ -7,10 +7,11 @@ import (
"strings" "strings"
"time" "time"
apcupsdClient "github.com/mdlayher/apcupsd"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/mdlayher/apcupsd"
) )
const defaultAddress = "tcp://127.0.0.1:3551" const defaultAddress = "tcp://127.0.0.1:3551"
@ -42,60 +43,67 @@ func (*ApcUpsd) SampleConfig() string {
func (h *ApcUpsd) Gather(acc telegraf.Accumulator) error { func (h *ApcUpsd) Gather(acc telegraf.Accumulator) error {
ctx := context.Background() ctx := context.Background()
for _, addr := range h.Servers { for _, server := range h.Servers {
addrBits, err := url.Parse(addr) err := func(address string) error {
addrBits, err := url.Parse(address)
if err != nil {
return err
}
if addrBits.Scheme == "" {
addrBits.Scheme = "tcp"
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(h.Timeout))
defer cancel()
status, err := fetchStatus(ctx, addrBits)
if err != nil {
return err
}
tags := map[string]string{
"serial": status.SerialNumber,
"ups_name": status.UPSName,
"status": status.Status,
"model": status.Model,
}
flags, err := strconv.ParseUint(strings.Fields(status.StatusFlags)[0], 0, 64)
if err != nil {
return err
}
fields := map[string]interface{}{
"status_flags": flags,
"input_voltage": status.LineVoltage,
"load_percent": status.LoadPercent,
"battery_charge_percent": status.BatteryChargePercent,
"time_left_ns": status.TimeLeft.Nanoseconds(),
"output_voltage": status.OutputVoltage,
"internal_temp": status.InternalTemp,
"battery_voltage": status.BatteryVoltage,
"input_frequency": status.LineFrequency,
"time_on_battery_ns": status.TimeOnBattery.Nanoseconds(),
"nominal_input_voltage": status.NominalInputVoltage,
"nominal_battery_voltage": status.NominalBatteryVoltage,
"nominal_power": status.NominalPower,
"firmware": status.Firmware,
"battery_date": status.BatteryDate,
}
acc.AddFields("apcupsd", fields, tags)
return nil
}(server)
if err != nil { if err != nil {
return err return err
} }
if addrBits.Scheme == "" {
addrBits.Scheme = "tcp"
}
ctx, cancel := context.WithTimeout(ctx, time.Duration(h.Timeout))
defer cancel()
status, err := fetchStatus(ctx, addrBits)
if err != nil {
return err
}
tags := map[string]string{
"serial": status.SerialNumber,
"ups_name": status.UPSName,
"status": status.Status,
"model": status.Model,
}
flags, err := strconv.ParseUint(strings.Fields(status.StatusFlags)[0], 0, 64)
if err != nil {
return err
}
fields := map[string]interface{}{
"status_flags": flags,
"input_voltage": status.LineVoltage,
"load_percent": status.LoadPercent,
"battery_charge_percent": status.BatteryChargePercent,
"time_left_ns": status.TimeLeft.Nanoseconds(),
"output_voltage": status.OutputVoltage,
"internal_temp": status.InternalTemp,
"battery_voltage": status.BatteryVoltage,
"input_frequency": status.LineFrequency,
"time_on_battery_ns": status.TimeOnBattery.Nanoseconds(),
"nominal_input_voltage": status.NominalInputVoltage,
"nominal_battery_voltage": status.NominalBatteryVoltage,
"nominal_power": status.NominalPower,
"firmware": status.Firmware,
"battery_date": status.BatteryDate,
}
acc.AddFields("apcupsd", fields, tags)
} }
return nil return nil
} }
func fetchStatus(ctx context.Context, addr *url.URL) (*apcupsd.Status, error) { func fetchStatus(ctx context.Context, addr *url.URL) (*apcupsdClient.Status, error) {
client, err := apcupsd.DialContext(ctx, addr.Scheme, addr.Host) client, err := apcupsdClient.DialContext(ctx, addr.Scheme, addr.Host)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -7,9 +7,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
func TestApcupsdDocs(_ *testing.T) { func TestApcupsdDocs(_ *testing.T) {
@ -35,31 +36,33 @@ func listen(ctx context.Context, t *testing.T, out [][]byte) (string, error) {
} }
go func() { go func() {
defer ln.Close()
for ctx.Err() == nil { for ctx.Err() == nil {
defer ln.Close() func() {
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
require.NoError(t, conn.SetReadDeadline(time.Now().Add(time.Second)))
conn, err := ln.Accept() in := make([]byte, 128)
if err != nil { n, err := conn.Read(in)
continue require.NoError(t, err, "failed to read from connection")
}
defer conn.Close()
require.NoError(t, conn.SetReadDeadline(time.Now().Add(time.Second)))
in := make([]byte, 128) status := []byte{0, 6, 's', 't', 'a', 't', 'u', 's'}
n, err := conn.Read(in) want, got := status, in[:n]
require.NoError(t, err, "failed to read from connection") require.Equal(t, want, got)
status := []byte{0, 6, 's', 't', 'a', 't', 'u', 's'} // Run against test function and append EOF to end of output bytes
want, got := status, in[:n] out = append(out, []byte{0, 0})
require.Equal(t, want, got)
// Run against test function and append EOF to end of output bytes for _, o := range out {
out = append(out, []byte{0, 0}) _, err := conn.Write(o)
require.NoError(t, err, "failed to write to connection")
for _, o := range out { }
_, err := conn.Write(o) }()
require.NoError(t, err, "failed to write to connection")
}
} }
}() }()
@ -137,9 +140,9 @@ func TestApcupsdGather(t *testing.T) {
"time_on_battery_ns": int64(0), "time_on_battery_ns": int64(0),
"nominal_input_voltage": float64(230), "nominal_input_voltage": float64(230),
"nominal_battery_voltage": float64(12), "nominal_battery_voltage": float64(12),
"nominal_power": int(865), "nominal_power": 865,
"firmware": string("857.L3 .I USB FW:L3"), "firmware": "857.L3 .I USB FW:L3",
"battery_date": string("2016-09-06"), "battery_date": "2016-09-06",
}, },
out: genOutput, out: genOutput,
}, },

View File

@ -3,7 +3,7 @@ The Beat plugin will collect metrics from the given Beat instances. It is
known to work with Filebeat and Kafkabeat. known to work with Filebeat and Kafkabeat.
### Configuration: ### Configuration:
```toml ```toml
## An URL from which to read beat-formatted JSON ## An URL from which to read Beat-formatted JSON
## Default is "http://127.0.0.1:5066". ## Default is "http://127.0.0.1:5066".
url = "http://127.0.0.1:5066" url = "http://127.0.0.1:5066"

View File

@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
@ -55,7 +54,7 @@ const description = "Read metrics exposed by Beat"
const suffixInfo = "/" const suffixInfo = "/"
const suffixStats = "/stats" const suffixStats = "/stats"
type BeatInfo struct { type Info struct {
Beat string `json:"beat"` Beat string `json:"beat"`
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
Name string `json:"name"` Name string `json:"name"`
@ -63,7 +62,7 @@ type BeatInfo struct {
Version string `json:"version"` Version string `json:"version"`
} }
type BeatStats struct { type Stats struct {
Beat map[string]interface{} `json:"beat"` Beat map[string]interface{} `json:"beat"`
FileBeat interface{} `json:"filebeat"` FileBeat interface{} `json:"filebeat"`
Libbeat interface{} `json:"libbeat"` Libbeat interface{} `json:"libbeat"`
@ -140,8 +139,8 @@ func (beat *Beat) createHTTPClient() (*http.Client, error) {
} }
// gatherJSONData query the data source and parse the response JSON // gatherJSONData query the data source and parse the response JSON
func (beat *Beat) gatherJSONData(url string, value interface{}) error { func (beat *Beat) gatherJSONData(address string, value interface{}) error {
request, err := http.NewRequest(beat.Method, url, nil) request, err := http.NewRequest(beat.Method, address, nil)
if err != nil { if err != nil {
return err return err
} }
@ -167,8 +166,8 @@ func (beat *Beat) gatherJSONData(url string, value interface{}) error {
} }
func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { func (beat *Beat) Gather(accumulator telegraf.Accumulator) error {
beatStats := &BeatStats{} beatStats := &Stats{}
beatInfo := &BeatInfo{} beatInfo := &Info{}
infoURL, err := url.Parse(beat.URL + suffixInfo) infoURL, err := url.Parse(beat.URL + suffixInfo)
if err != nil { if err != nil {

View File

@ -64,8 +64,8 @@ func addJSONCounter(acc telegraf.Accumulator, commonTags map[string]string, stat
} }
//Add grouped metrics //Add grouped metrics
for _, metric := range grouper.Metrics() { for _, groupedMetric := range grouper.Metrics() {
acc.AddMetric(metric) acc.AddMetric(groupedMetric)
} }
} }
@ -144,8 +144,8 @@ func (b *Bind) addStatsJSON(stats jsonStats, acc telegraf.Accumulator, urlTag st
} }
//Add grouped metrics //Add grouped metrics
for _, metric := range grouper.Metrics() { for _, groupedMetric := range grouper.Metrics() {
acc.AddMetric(metric) acc.AddMetric(groupedMetric)
} }
} }
@ -157,22 +157,30 @@ func (b *Bind) readStatsJSON(addr *url.URL, acc telegraf.Accumulator) error {
// Progressively build up full jsonStats struct by parsing the individual HTTP responses // Progressively build up full jsonStats struct by parsing the individual HTTP responses
for _, suffix := range [...]string{"/server", "/net", "/mem"} { for _, suffix := range [...]string{"/server", "/net", "/mem"} {
scrapeURL := addr.String() + suffix err := func() error {
scrapeURL := addr.String() + suffix
resp, err := b.client.Get(scrapeURL)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status)
}
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return fmt.Errorf("unable to decode JSON blob: %s", err)
}
return nil
}()
resp, err := b.client.Get(scrapeURL)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status)
}
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return fmt.Errorf("Unable to decode JSON blob: %s", err)
}
} }
b.addStatsJSON(stats, acc, addr.Host) b.addStatsJSON(stats, acc, addr.Host)

View File

@ -81,8 +81,8 @@ func addXMLv2Counter(acc telegraf.Accumulator, commonTags map[string]string, sta
} }
//Add grouped metrics //Add grouped metrics
for _, metric := range grouper.Metrics() { for _, groupedMetric := range grouper.Metrics() {
acc.AddMetric(metric) acc.AddMetric(groupedMetric)
} }
} }
@ -103,7 +103,7 @@ func (b *Bind) readStatsXMLv2(addr *url.URL, acc telegraf.Accumulator) error {
} }
if err := xml.NewDecoder(resp.Body).Decode(&stats); err != nil { if err := xml.NewDecoder(resp.Body).Decode(&stats); err != nil {
return fmt.Errorf("Unable to decode XML document: %s", err) return fmt.Errorf("unable to decode XML document: %s", err)
} }
tags := map[string]string{"url": addr.Host} tags := map[string]string{"url": addr.Host}

View File

@ -129,8 +129,8 @@ func (b *Bind) addStatsXMLv3(stats v3Stats, acc telegraf.Accumulator, hostPort s
} }
//Add grouped metrics //Add grouped metrics
for _, metric := range grouper.Metrics() { for _, groupedMetric := range grouper.Metrics() {
acc.AddMetric(metric) acc.AddMetric(groupedMetric)
} }
} }
@ -142,22 +142,30 @@ func (b *Bind) readStatsXMLv3(addr *url.URL, acc telegraf.Accumulator) error {
// Progressively build up full v3Stats struct by parsing the individual HTTP responses // Progressively build up full v3Stats struct by parsing the individual HTTP responses
for _, suffix := range [...]string{"/server", "/net", "/mem"} { for _, suffix := range [...]string{"/server", "/net", "/mem"} {
scrapeURL := addr.String() + suffix err := func() error {
scrapeURL := addr.String() + suffix
resp, err := b.client.Get(scrapeURL)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status)
}
if err := xml.NewDecoder(resp.Body).Decode(&stats); err != nil {
return fmt.Errorf("unable to decode XML document: %s", err)
}
return nil
}()
resp, err := b.client.Get(scrapeURL)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status)
}
if err := xml.NewDecoder(resp.Body).Decode(&stats); err != nil {
return fmt.Errorf("Unable to decode XML document: %s", err)
}
} }
b.addStatsXMLv3(stats, acc, addr.Host) b.addStatsXMLv3(stats, acc, addr.Host)

View File

@ -9,8 +9,9 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
// remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json // remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json
@ -49,7 +50,7 @@ func getHTTPServerBasicAuth() *httptest.Server {
w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`) w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
username, password, authOK := r.BasicAuth() username, password, authOK := r.BasicAuth()
if authOK == false { if !authOK {
http.Error(w, "Not authorized", 401) http.Error(w, "Not authorized", 401)
return return
} }