diff --git a/plugins/inputs/aerospike/README.md b/plugins/inputs/aerospike/README.md index 66fbbe12e..59ff6ed70 100644 --- a/plugins/inputs/aerospike/README.md +++ b/plugins/inputs/aerospike/README.md @@ -28,18 +28,17 @@ All metrics are attempted to be cast to integers, then booleans, then strings. # tls_key = "/etc/telegraf/key.pem" ## If false, skip chain & host verification # insecure_skip_verify = true - + # Feature Options # Add namespace variable to limit the namespaces executed on # Leave blank to do all # disable_query_namespaces = true # default false # namespaces = ["namespace1", "namespace2"] - # Enable set level telmetry + # Enable set level telemetry # query_sets = true # default: false # Add namespace set combinations to limit sets executed on - # Leave blank to do all - # sets = ["namespace1/set1", "namespace1/set2"] + # Leave blank to do all sets # sets = ["namespace1/set1", "namespace1/set2", "namespace3"] # Histograms @@ -48,12 +47,10 @@ All metrics are attempted to be cast to integers, then booleans, then strings. # by default, aerospike produces a 100 bucket histogram # this is not great for most graphing tools, this will allow - # the ability to squash this to a smaller number of buckets + # the ability to squash this to a smaller number of buckets # To have a balanced histogram, the number of buckets chosen # should divide evenly into 100. # num_histogram_buckets = 100 # default: 10 - - ``` ### Measurements: diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index 38674d89a..dd2ff32df 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -10,11 +10,11 @@ import ( "sync" "time" + as "github.com/aerospike/aerospike-client-go" + "github.com/influxdata/telegraf" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" - - as "github.com/aerospike/aerospike-client-go" ) type Aerospike struct { @@ -65,7 +65,7 @@ var sampleConfig = ` # disable_query_namespaces = true # default false # namespaces = ["namespace1", "namespace2"] - # Enable set level telmetry + # Enable set level telemetry # query_sets = true # default: false # Add namespace set combinations to limit sets executed on # Leave blank to do all sets @@ -77,7 +77,9 @@ var sampleConfig = ` # by default, aerospike produces a 100 bucket histogram # this is not great for most graphing tools, this will allow - # the ability to squash this to a smaller number of buckets + # the ability to squash this to a smaller number of buckets + # To have a balanced histogram, the number of buckets chosen + # should divide evenly into 100. # num_histogram_buckets = 100 # default: 10 ` @@ -119,7 +121,7 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error { } if len(a.Servers) == 0 { - return a.gatherServer("127.0.0.1:3000", acc) + return a.gatherServer(acc, "127.0.0.1:3000") } var wg sync.WaitGroup @@ -127,7 +129,7 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error { for _, server := range a.Servers { go func(serv string) { defer wg.Done() - acc.AddError(a.gatherServer(serv, acc)) + acc.AddError(a.gatherServer(acc, serv)) }(server) } @@ -135,7 +137,7 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error { return nil } -func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) error { +func (a *Aerospike) gatherServer(acc telegraf.Accumulator, hostPort string) error { host, port, err := net.SplitHostPort(hostPort) if err != nil { return err @@ -162,7 +164,7 @@ func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) erro if err != nil { return err } - a.parseNodeInfo(stats, hostPort, n.GetName(), acc) + a.parseNodeInfo(acc, stats, hostPort, n.GetName()) namespaces, err := a.getNamespaces(n) if err != nil { @@ -176,18 +178,17 @@ func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) erro if err != nil { continue - } else { - a.parseNamespaceInfo(stats, hostPort, namespace, n.GetName(), acc) } + a.parseNamespaceInfo(acc, stats, hostPort, namespace, n.GetName()) if a.EnableTTLHistogram { - err = a.getTTLHistogram(hostPort, namespace, "", n, acc) + err = a.getTTLHistogram(acc, hostPort, namespace, "", n) if err != nil { continue } } if a.EnableObjectSizeLinearHistogram { - err = a.getObjectSizeLinearHistogram(hostPort, namespace, "", n, acc) + err = a.getObjectSizeLinearHistogram(acc, hostPort, namespace, "", n) if err != nil { continue } @@ -200,24 +201,22 @@ func (a *Aerospike) gatherServer(hostPort string, acc telegraf.Accumulator) erro if err == nil { for _, namespaceSet := range namespaceSets { namespace, set := splitNamespaceSet(namespaceSet) - stats, err := a.getSetInfo(namespaceSet, n) if err != nil { continue - } else { - a.parseSetInfo(stats, hostPort, namespaceSet, n.GetName(), acc) } + a.parseSetInfo(acc, stats, hostPort, namespaceSet, n.GetName()) if a.EnableTTLHistogram { - err = a.getTTLHistogram(hostPort, namespace, set, n, acc) + err = a.getTTLHistogram(acc, hostPort, namespace, set, n) if err != nil { continue } } if a.EnableObjectSizeLinearHistogram { - err = a.getObjectSizeLinearHistogram(hostPort, namespace, set, n, acc) + err = a.getObjectSizeLinearHistogram(acc, hostPort, namespace, set, n) if err != nil { continue } @@ -238,7 +237,7 @@ func (a *Aerospike) getNodeInfo(n *as.Node) (map[string]string, error) { return stats, nil } -func (a *Aerospike) parseNodeInfo(stats map[string]string, hostPort string, nodeName string, acc telegraf.Accumulator) { +func (a *Aerospike) parseNodeInfo(acc telegraf.Accumulator, stats map[string]string, hostPort string, nodeName string) { tags := map[string]string{ "aerospike_host": hostPort, "node_name": nodeName, @@ -275,7 +274,7 @@ func (a *Aerospike) getNamespaceInfo(namespace string, n *as.Node) (map[string]s return stats, err } -func (a *Aerospike) parseNamespaceInfo(stats map[string]string, hostPort string, namespace string, nodeName string, acc telegraf.Accumulator) { +func (a *Aerospike) parseNamespaceInfo(acc telegraf.Accumulator, stats map[string]string, hostPort string, namespace string, nodeName string) { nTags := map[string]string{ "aerospike_host": hostPort, "node_name": nodeName, @@ -341,7 +340,7 @@ func (a *Aerospike) getSetInfo(namespaceSet string, n *as.Node) (map[string]stri return stats, nil } -func (a *Aerospike) parseSetInfo(stats map[string]string, hostPort string, namespaceSet string, nodeName string, acc telegraf.Accumulator) { +func (a *Aerospike) parseSetInfo(acc telegraf.Accumulator, stats map[string]string, hostPort string, namespaceSet string, nodeName string) { stat := strings.Split( strings.TrimSuffix( stats[fmt.Sprintf("sets/%s", namespaceSet)], ";"), ":") @@ -363,22 +362,26 @@ func (a *Aerospike) parseSetInfo(stats map[string]string, hostPort string, names acc.AddFields("aerospike_set", nFields, nTags, time.Now()) } -func (a *Aerospike) getTTLHistogram(hostPort string, namespace string, set string, n *as.Node, acc telegraf.Accumulator) error { +func (a *Aerospike) getTTLHistogram(acc telegraf.Accumulator, hostPort string, namespace string, set string, n *as.Node) error { stats, err := a.getHistogram(namespace, set, "ttl", n) if err != nil { return err } - a.parseHistogram(stats, hostPort, namespace, set, "ttl", n.GetName(), acc) + + nTags := createTags(hostPort, n.GetName(), namespace, set) + a.parseHistogram(acc, stats, nTags, "ttl") return nil } -func (a *Aerospike) getObjectSizeLinearHistogram(hostPort string, namespace string, set string, n *as.Node, acc telegraf.Accumulator) error { +func (a *Aerospike) getObjectSizeLinearHistogram(acc telegraf.Accumulator, hostPort string, namespace string, set string, n *as.Node) error { stats, err := a.getHistogram(namespace, set, "object-size-linear", n) if err != nil { return err } - a.parseHistogram(stats, hostPort, namespace, set, "object-size-linear", n.GetName(), acc) + + nTags := createTags(hostPort, n.GetName(), namespace, set) + a.parseHistogram(acc, stats, nTags, "object-size-linear") return nil } @@ -398,17 +401,7 @@ func (a *Aerospike) getHistogram(namespace string, set string, histogramType str return stats, nil } -func (a *Aerospike) parseHistogram(stats map[string]string, hostPort string, namespace string, set string, histogramType string, nodeName string, acc telegraf.Accumulator) { - nTags := map[string]string{ - "aerospike_host": hostPort, - "node_name": nodeName, - "namespace": namespace, - } - - if len(set) > 0 { - nTags["set"] = set - } - +func (a *Aerospike) parseHistogram(acc telegraf.Accumulator, stats map[string]string, nTags map[string]string, histogramType string) { nFields := make(map[string]interface{}) for _, stat := range stats { @@ -421,7 +414,7 @@ func (a *Aerospike) parseHistogram(stats map[string]string, hostPort string, nam if pieces[0] == "buckets" { buckets := strings.Split(pieces[1], ",") - // Normalize incase of less buckets than expected + // Normalize in case of less buckets than expected numRecordsPerBucket := 1 if len(buckets) > a.NumberHistogramBuckets { numRecordsPerBucket = int(math.Ceil(float64(len(buckets)) / float64(a.NumberHistogramBuckets))) @@ -458,7 +451,7 @@ func (a *Aerospike) parseHistogram(stats map[string]string, hostPort string, nam acc.AddFields(fmt.Sprintf("aerospike_histogram_%v", strings.Replace(histogramType, "-", "_", -1)), nFields, nTags, time.Now()) } -func splitNamespaceSet(namespaceSet string) (string, string) { +func splitNamespaceSet(namespaceSet string) (namespace string, set string) { split := strings.Split(namespaceSet, "/") return split[0], split[1] } @@ -478,6 +471,19 @@ func parseAerospikeValue(key string, v string) interface{} { } } +func createTags(hostPort string, nodeName string, namespace string, set string) map[string]string { + nTags := map[string]string{ + "aerospike_host": hostPort, + "node_name": nodeName, + "namespace": namespace, + } + + if len(set) > 0 { + nTags["set"] = set + } + return nTags +} + func init() { inputs.Add("aerospike", func() telegraf.Input { return &Aerospike{} diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index 57d37a06c..ab93d4e2a 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -4,9 +4,9 @@ import ( "testing" as "github.com/aerospike/aerospike-client-go" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestAerospikeStatisticsIntegration(t *testing.T) { @@ -23,14 +23,14 @@ func TestAerospikeStatisticsIntegration(t *testing.T) { err := acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("aerospike_node")) - assert.True(t, acc.HasTag("aerospike_node", "node_name")) - assert.True(t, acc.HasMeasurement("aerospike_namespace")) - assert.True(t, acc.HasTag("aerospike_namespace", "node_name")) - assert.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error")) + require.True(t, acc.HasMeasurement("aerospike_node")) + require.True(t, acc.HasTag("aerospike_node", "node_name")) + require.True(t, acc.HasMeasurement("aerospike_namespace")) + require.True(t, acc.HasTag("aerospike_namespace", "node_name")) + require.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error")) namespaceName := acc.TagValue("aerospike_namespace", "namespace") - assert.Equal(t, namespaceName, "test") + require.Equal(t, "test", namespaceName) } func TestAerospikeStatisticsPartialErrIntegration(t *testing.T) { @@ -50,14 +50,14 @@ func TestAerospikeStatisticsPartialErrIntegration(t *testing.T) { require.Error(t, err) - assert.True(t, acc.HasMeasurement("aerospike_node")) - assert.True(t, acc.HasMeasurement("aerospike_namespace")) - assert.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error")) + require.True(t, acc.HasMeasurement("aerospike_node")) + require.True(t, acc.HasMeasurement("aerospike_namespace")) + require.True(t, acc.HasInt64Field("aerospike_node", "batch_index_error")) namespaceName := acc.TagSetValue("aerospike_namespace", "namespace") - assert.Equal(t, namespaceName, "test") + require.Equal(t, "test", namespaceName) } -func TestSelectNamepsacesIntegration(t *testing.T) { +func TestSelectNamespacesIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping aerospike integration tests.") } @@ -73,10 +73,10 @@ func TestSelectNamepsacesIntegration(t *testing.T) { err := acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("aerospike_node")) - assert.True(t, acc.HasTag("aerospike_node", "node_name")) - assert.True(t, acc.HasMeasurement("aerospike_namespace")) - assert.True(t, acc.HasTag("aerospike_namespace", "node_name")) + require.True(t, acc.HasMeasurement("aerospike_node")) + require.True(t, acc.HasTag("aerospike_node", "node_name")) + require.True(t, acc.HasMeasurement("aerospike_namespace")) + require.True(t, acc.HasTag("aerospike_namespace", "node_name")) // Expect only 1 namespace count := 0 @@ -85,10 +85,10 @@ func TestSelectNamepsacesIntegration(t *testing.T) { count++ } } - assert.Equal(t, count, 1) + require.Equal(t, 1, count) // expect namespace to have no fields as nonexistent - assert.False(t, acc.HasInt64Field("aerospke_namespace", "appeals_tx_remaining")) + require.False(t, acc.HasInt64Field("aerospke_namespace", "appeals_tx_remaining")) } func TestDisableQueryNamespacesIntegration(t *testing.T) { @@ -107,15 +107,15 @@ func TestDisableQueryNamespacesIntegration(t *testing.T) { err := acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("aerospike_node")) - assert.False(t, acc.HasMeasurement("aerospike_namespace")) + require.True(t, acc.HasMeasurement("aerospike_node")) + require.False(t, acc.HasMeasurement("aerospike_namespace")) a.DisableQueryNamespaces = false err = acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("aerospike_node")) - assert.True(t, acc.HasMeasurement("aerospike_namespace")) + require.True(t, acc.HasMeasurement("aerospike_node")) + require.True(t, acc.HasMeasurement("aerospike_namespace")) } func TestQuerySetsIntegration(t *testing.T) { @@ -127,6 +127,7 @@ func TestQuerySetsIntegration(t *testing.T) { // test is the default namespace from aerospike policy := as.NewClientPolicy() client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000) + require.NoError(t, err) key, err := as.NewKey("test", "foo", 123) require.NoError(t, err) @@ -158,12 +159,12 @@ func TestQuerySetsIntegration(t *testing.T) { err = acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo")) - assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar")) + require.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo")) + require.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar")) - assert.True(t, acc.HasMeasurement("aerospike_set")) - assert.True(t, acc.HasTag("aerospike_set", "set")) - assert.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes")) + require.True(t, acc.HasMeasurement("aerospike_set")) + require.True(t, acc.HasTag("aerospike_set", "set")) + require.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes")) } func TestSelectQuerySetsIntegration(t *testing.T) { @@ -175,6 +176,7 @@ func TestSelectQuerySetsIntegration(t *testing.T) { // test is the default namespace from aerospike policy := as.NewClientPolicy() client, err := as.NewClientWithPolicy(policy, testutil.GetLocalHost(), 3000) + require.NoError(t, err) key, err := as.NewKey("test", "foo", 123) require.NoError(t, err) @@ -207,12 +209,12 @@ func TestSelectQuerySetsIntegration(t *testing.T) { err = acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo")) - assert.False(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar")) + require.True(t, FindTagValue(&acc, "aerospike_set", "set", "test/foo")) + require.False(t, FindTagValue(&acc, "aerospike_set", "set", "test/bar")) - assert.True(t, acc.HasMeasurement("aerospike_set")) - assert.True(t, acc.HasTag("aerospike_set", "set")) - assert.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes")) + require.True(t, acc.HasMeasurement("aerospike_set")) + require.True(t, acc.HasTag("aerospike_set", "set")) + require.True(t, acc.HasInt64Field("aerospike_set", "memory_data_bytes")) } func TestDisableTTLHistogramIntegration(t *testing.T) { @@ -233,7 +235,7 @@ func TestDisableTTLHistogramIntegration(t *testing.T) { err := acc.GatherError(a.Gather) require.NoError(t, err) - assert.False(t, acc.HasMeasurement("aerospike_histogram_ttl")) + require.False(t, acc.HasMeasurement("aerospike_histogram_ttl")) } func TestTTLHistogramIntegration(t *testing.T) { if testing.Short() { @@ -250,7 +252,7 @@ func TestTTLHistogramIntegration(t *testing.T) { } /* Produces histogram - Measurment exists + Measurement exists Has appropriate tags (node name etc) Has appropriate keys (time:value) may be able to leverage histogram plugin @@ -259,8 +261,8 @@ func TestTTLHistogramIntegration(t *testing.T) { err := acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("aerospike_histogram_ttl")) - assert.True(t, FindTagValue(&acc, "aerospike_histogram_ttl", "namespace", "test")) + require.True(t, acc.HasMeasurement("aerospike_histogram_ttl")) + require.True(t, FindTagValue(&acc, "aerospike_histogram_ttl", "namespace", "test")) } func TestDisableObjectSizeLinearHistogramIntegration(t *testing.T) { if testing.Short() { @@ -280,7 +282,7 @@ func TestDisableObjectSizeLinearHistogramIntegration(t *testing.T) { err := acc.GatherError(a.Gather) require.NoError(t, err) - assert.False(t, acc.HasMeasurement("aerospike_histogram_object_size_linear")) + require.False(t, acc.HasMeasurement("aerospike_histogram_object_size_linear")) } func TestObjectSizeLinearHistogramIntegration(t *testing.T) { if testing.Short() { @@ -297,7 +299,7 @@ func TestObjectSizeLinearHistogramIntegration(t *testing.T) { } /* Produces histogram - Measurment exists + Measurement exists Has appropriate tags (node name etc) Has appropriate keys (time:value) @@ -305,8 +307,8 @@ func TestObjectSizeLinearHistogramIntegration(t *testing.T) { var acc testutil.Accumulator err := acc.GatherError(a.Gather) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("aerospike_histogram_object_size_linear")) - assert.True(t, FindTagValue(&acc, "aerospike_histogram_object_size_linear", "namespace", "test")) + require.True(t, acc.HasMeasurement("aerospike_histogram_object_size_linear")) + require.True(t, FindTagValue(&acc, "aerospike_histogram_object_size_linear", "namespace", "test")) } func TestParseNodeInfo(t *testing.T) { @@ -330,7 +332,7 @@ func TestParseNodeInfo(t *testing.T) { "node_name": "TestNodeName", } - a.parseNodeInfo(stats, "127.0.0.1:3000", "TestNodeName", &acc) + a.parseNodeInfo(&acc, stats, "127.0.0.1:3000", "TestNodeName") acc.AssertContainsTaggedFields(t, "aerospike_node", expectedFields, expectedTags) } @@ -356,7 +358,7 @@ func TestParseNamespaceInfo(t *testing.T) { "namespace": "test", } - a.parseNamespaceInfo(stats, "127.0.0.1:3000", "test", "TestNodeName", &acc) + a.parseNamespaceInfo(&acc, stats, "127.0.0.1:3000", "test", "TestNodeName") acc.AssertContainsTaggedFields(t, "aerospike_namespace", expectedFields, expectedTags) } @@ -380,7 +382,7 @@ func TestParseSetInfo(t *testing.T) { "node_name": "TestNodeName", "set": "test/foo", } - a.parseSetInfo(stats, "127.0.0.1:3000", "test/foo", "TestNodeName", &acc) + a.parseSetInfo(&acc, stats, "127.0.0.1:3000", "test/foo", "TestNodeName") acc.AssertContainsTaggedFields(t, "aerospike_set", expectedFields, expectedTags) } @@ -412,7 +414,8 @@ func TestParseHistogramSet(t *testing.T) { "set": "foo", } - a.parseHistogram(stats, "127.0.0.1:3000", "test", "foo", "object-size-linear", "TestNodeName", &acc) + nTags := createTags("127.0.0.1:3000", "TestNodeName", "test", "foo") + a.parseHistogram(&acc, stats, nTags, "object-size-linear") acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags) } func TestParseHistogramNamespace(t *testing.T) { @@ -442,16 +445,17 @@ func TestParseHistogramNamespace(t *testing.T) { "namespace": "test", } - a.parseHistogram(stats, "127.0.0.1:3000", "test", "", "object-size-linear", "TestNodeName", &acc) + nTags := createTags("127.0.0.1:3000", "TestNodeName", "test", "") + a.parseHistogram(&acc, stats, nTags, "object-size-linear") acc.AssertContainsTaggedFields(t, "aerospike_histogram_object_size_linear", expectedFields, expectedTags) } func TestAerospikeParseValue(t *testing.T) { // uint64 with value bigger than int64 max val := parseAerospikeValue("", "18446744041841121751") - require.Equal(t, val, uint64(18446744041841121751)) + require.Equal(t, uint64(18446744041841121751), val) val = parseAerospikeValue("", "true") - require.Equal(t, val, true) + require.Equal(t, true, val) // int values val = parseAerospikeValue("", "42") diff --git a/plugins/inputs/aliyuncms/aliyuncms.go b/plugins/inputs/aliyuncms/aliyuncms.go index 3b521579b..ac70b9a44 100644 --- a/plugins/inputs/aliyuncms/aliyuncms.go +++ b/plugins/inputs/aliyuncms/aliyuncms.go @@ -11,13 +11,14 @@ import ( "github.com/aliyun/alibaba-cloud-sdk-go/sdk" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers" "github.com/aliyun/alibaba-cloud-sdk-go/services/cms" + "github.com/jmespath/go-jmespath" + "github.com/pkg/errors" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/jmespath/go-jmespath" - "github.com/pkg/errors" ) const ( @@ -161,7 +162,7 @@ type ( dtLock sync.Mutex //Guard for discoveryTags & dimensions discoveryTags map[string]map[string]string //Internal data structure that can enrich metrics with tags dimensionsUdObj map[string]string - dimensionsUdArr []map[string]string //Parsed Dimesnsions JSON string (unmarshalled) + dimensionsUdArr []map[string]string //Parsed Dimensions JSON string (unmarshalled) requestDimensions []map[string]string //this is the actual dimensions list that would be used in API request requestDimensionsStr string //String representation of the above @@ -239,7 +240,7 @@ func (s *AliyunCMS) Init() error { //Init discovery... if s.dt == nil { //Support for tests - s.dt, err = NewDiscoveryTool(s.DiscoveryRegions, s.Project, s.Log, credential, int(float32(s.RateLimit)*0.2), time.Duration(s.DiscoveryInterval)) + s.dt, err = newDiscoveryTool(s.DiscoveryRegions, s.Project, s.Log, credential, int(float32(s.RateLimit)*0.2), time.Duration(s.DiscoveryInterval)) if err != nil { s.Log.Errorf("Discovery tool is not activated: %v", err) s.dt = nil @@ -395,8 +396,8 @@ func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, me } //Tag helper -func parseTag(tagSpec string, data interface{}) (string, string, error) { - tagKey := tagSpec +func parseTag(tagSpec string, data interface{}) (tagKey string, tagValue string, err error) { + tagKey = tagSpec queryPath := tagSpec //Split query path to tagKey and query path diff --git a/plugins/inputs/aliyuncms/aliyuncms_test.go b/plugins/inputs/aliyuncms/aliyuncms_test.go index b9028c8ba..a2bae5d0d 100644 --- a/plugins/inputs/aliyuncms/aliyuncms_test.go +++ b/plugins/inputs/aliyuncms/aliyuncms_test.go @@ -12,12 +12,13 @@ import ( "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses" "github.com/aliyun/alibaba-cloud-sdk-go/services/cms" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/testutil" - "github.com/pkg/errors" - "github.com/stretchr/testify/require" ) const inputTitle = "inputs.aliyuncms" @@ -95,7 +96,7 @@ func getDiscoveryTool(project string, discoverRegions []string) (*discoveryTool, return nil, errors.Errorf("failed to retrieve credential: %v", err) } - dt, err := NewDiscoveryTool(discoverRegions, project, testutil.Logger{Name: inputTitle}, credential, 1, time.Minute*2) + dt, err := newDiscoveryTool(discoverRegions, project, testutil.Logger{Name: inputTitle}, credential, 1, time.Minute*2) if err != nil { return nil, errors.Errorf("Can't create discovery tool object: %v", err) diff --git a/plugins/inputs/aliyuncms/discovery.go b/plugins/inputs/aliyuncms/discovery.go index 7e33d7f92..c3f35c78a 100644 --- a/plugins/inputs/aliyuncms/discovery.go +++ b/plugins/inputs/aliyuncms/discovery.go @@ -2,7 +2,6 @@ package aliyuncms import ( "encoding/json" - "github.com/influxdata/telegraf" "reflect" "regexp" "strconv" @@ -17,8 +16,10 @@ import ( "github.com/aliyun/alibaba-cloud-sdk-go/services/rds" "github.com/aliyun/alibaba-cloud-sdk-go/services/slb" "github.com/aliyun/alibaba-cloud-sdk-go/services/vpc" - "github.com/influxdata/telegraf/internal/limiter" "github.com/pkg/errors" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/limiter" ) // https://www.alibabacloud.com/help/doc-detail/40654.htm?gclid=Cj0KCQjw4dr0BRCxARIsAKUNjWTAMfyVUn_Y3OevFBV3CMaazrhq0URHsgE7c0m0SeMQRKlhlsJGgIEaAviyEALw_wcB @@ -69,6 +70,13 @@ type discoveryTool struct { lg telegraf.Logger //Telegraf logger (should be provided) } +type response struct { + discData []interface{} + totalCount int + pageSize int + pageNumber int +} + //getRPCReqFromDiscoveryRequest - utility function to map between aliyun request primitives //discoveryRequest represents different type of discovery requests func getRPCReqFromDiscoveryRequest(req discoveryRequest) (*requests.RpcRequest, error) { @@ -97,13 +105,13 @@ func getRPCReqFromDiscoveryRequest(req discoveryRequest) (*requests.RpcRequest, return nil, errors.Errorf("Didn't find *requests.RpcRequest embedded struct in %q", ptrV.Type()) } -//NewDiscoveryTool function returns discovery tool object. +//newDiscoveryTool function returns discovery tool object. //The object is used to periodically get data about aliyun objects and send this //data into channel. The intention is to enrich reported metrics with discovery data. //Discovery is supported for a limited set of object types (defined by project) and can be extended in future. //Discovery can be limited by region if not set, then all regions is queried. //Request against API can inquire additional costs, consult with aliyun API documentation. -func NewDiscoveryTool(regions []string, project string, lg telegraf.Logger, credential auth.Credential, rateLimit int, discoveryInterval time.Duration) (*discoveryTool, error) { +func newDiscoveryTool(regions []string, project string, lg telegraf.Logger, credential auth.Credential, rateLimit int, discoveryInterval time.Duration) (*discoveryTool, error) { var ( dscReq = map[string]discoveryRequest{} cli = map[string]aliyunSdkClient{} @@ -292,22 +300,22 @@ func NewDiscoveryTool(regions []string, project string, lg telegraf.Logger, cred }, nil } -func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (discData []interface{}, totalCount int, pageSize int, pageNumber int, err error) { +func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (discoveryResponse *response, err error) { var ( - fullOutput = map[string]interface{}{} - data []byte - foundDataItem bool - foundRootKey bool + fullOutput = map[string]interface{}{} + foundDataItem, foundRootKey bool + discData []interface{} + totalCount, pageSize, pageNumber int ) - data = resp.GetHttpContentBytes() + data := resp.GetHttpContentBytes() if data == nil { //No data - return nil, 0, 0, 0, errors.Errorf("No data in response to be parsed") + return nil, errors.Errorf("No data in response to be parsed") } err = json.Unmarshal(data, &fullOutput) if err != nil { - return nil, 0, 0, 0, errors.Errorf("Can't parse JSON from discovery response: %v", err) + return nil, errors.Errorf("Can't parse JSON from discovery response: %v", err) } for key, val := range fullOutput { @@ -316,7 +324,7 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) foundRootKey = true rootKeyVal, ok := val.(map[string]interface{}) if !ok { - return nil, 0, 0, 0, errors.Errorf("Content of root key %q, is not an object: %v", key, val) + return nil, errors.Errorf("Content of root key %q, is not an object: %v", key, val) } //It should contain the array with discovered data @@ -326,7 +334,7 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) } } if !foundDataItem { - return nil, 0, 0, 0, errors.Errorf("Didn't find array item in root key %q", key) + return nil, errors.Errorf("Didn't find array item in root key %q", key) } case "TotalCount": totalCount = int(val.(float64)) @@ -337,55 +345,54 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) } } if !foundRootKey { - return nil, 0, 0, 0, errors.Errorf("Didn't find root key %q in discovery response", dt.respRootKey) + return nil, errors.Errorf("Didn't find root key %q in discovery response", dt.respRootKey) } - return + return &response{ + discData: discData, + totalCount: totalCount, + pageSize: pageSize, + pageNumber: pageNumber, + }, nil } -func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, limiter chan bool) (map[string]interface{}, error) { - var ( - err error - resp *responses.CommonResponse - data []interface{} - discoveryData []interface{} - totalCount int - pageNumber int - ) +func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, limiterChan chan bool) (map[string]interface{}, error) { + var discoveryData []interface{} + defer delete(req.QueryParams, "PageNumber") for { - if limiter != nil { - <-limiter //Rate limiting + if limiterChan != nil { + <-limiterChan //Rate limiting } - resp, err = cli.ProcessCommonRequest(req) + resp, err := cli.ProcessCommonRequest(req) if err != nil { return nil, err } - data, totalCount, _, pageNumber, err = dt.parseDiscoveryResponse(resp) + discoveryResponse, err := dt.parseDiscoveryResponse(resp) if err != nil { return nil, err } - discoveryData = append(discoveryData, data...) + discoveryData = append(discoveryData, discoveryResponse.discData...) //Pagination - pageNumber++ - req.QueryParams["PageNumber"] = strconv.Itoa(pageNumber) + discoveryResponse.pageNumber++ + req.QueryParams["PageNumber"] = strconv.Itoa(discoveryResponse.pageNumber) - if len(discoveryData) == totalCount { //All data received + if len(discoveryData) == discoveryResponse.totalCount { //All data received //Map data to appropriate shape before return preparedData := map[string]interface{}{} for _, raw := range discoveryData { - if elem, ok := raw.(map[string]interface{}); ok { - if objectID, ok := elem[dt.respObjectIDKey].(string); ok { - preparedData[objectID] = elem - } - } else { + elem, ok := raw.(map[string]interface{}) + if !ok { return nil, errors.Errorf("Can't parse input data element, not a map[string]interface{} type") } + if objectID, ok := elem[dt.respObjectIDKey].(string); ok { + preparedData[objectID] = elem + } } return preparedData, nil @@ -393,7 +400,7 @@ func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.Com } } -func (dt *discoveryTool) getDiscoveryDataAllRegions(limiter chan bool) (map[string]interface{}, error) { +func (dt *discoveryTool) getDiscoveryDataAllRegions(limiterChan chan bool) (map[string]interface{}, error) { var ( data map[string]interface{} resultData = map[string]interface{}{} @@ -424,7 +431,7 @@ func (dt *discoveryTool) getDiscoveryDataAllRegions(limiter chan bool) (map[stri commonRequest.TransToAcsRequest() //Get discovery data using common request - data, err = dt.getDiscoveryData(cli, commonRequest, limiter) + data, err = dt.getDiscoveryData(cli, commonRequest, limiterChan) if err != nil { return nil, err } diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index 8ef6d6fe2..ff417eb26 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -43,7 +43,7 @@ The following defaults are known to work with RabbitMQ: # exchange_arguments = { } # exchange_arguments = {"hash_property" = "timestamp"} - ## AMQP queue name + ## AMQP queue name. queue = "telegraf" ## AMQP queue durability can be "transient" or "durable". diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 39bfeeaed..abe86bc38 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -9,12 +9,13 @@ import ( "sync" "time" + "github.com/streadway/amqp" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" - "github.com/streadway/amqp" ) const ( @@ -183,7 +184,7 @@ func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { // make new tls config - tls, err := a.ClientConfig.TLSConfig() + tlsCfg, err := a.ClientConfig.TLSConfig() if err != nil { return nil, err } @@ -201,7 +202,7 @@ func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { } config := amqp.Config{ - TLSClientConfig: tls, + TLSClientConfig: tlsCfg, SASL: auth, // if nil, it will be PLAIN } return &config, nil @@ -292,12 +293,9 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err } if a.Exchange != "" { - var exchangeDurable = true - switch a.ExchangeDurability { - case "transient": + exchangeDurable := true + if a.ExchangeDurability == "transient" { exchangeDurable = false - default: - exchangeDurable = true } exchangeArgs := make(amqp.Table, len(a.ExchangeArguments)) @@ -305,11 +303,8 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err exchangeArgs[k] = v } - err = declareExchange( + err = a.declareExchange( ch, - a.Exchange, - a.ExchangeType, - a.ExchangePassive, exchangeDurable, exchangeArgs) if err != nil { @@ -317,11 +312,7 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err } } - q, err := declareQueue( - ch, - a.Queue, - a.QueueDurability, - a.QueuePassive) + q, err := a.declareQueue(ch) if err != nil { return nil, err } @@ -364,19 +355,16 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err return msgs, err } -func declareExchange( +func (a *AMQPConsumer) declareExchange( channel *amqp.Channel, - exchangeName string, - exchangeType string, - exchangePassive bool, exchangeDurable bool, exchangeArguments amqp.Table, ) error { var err error - if exchangePassive { + if a.ExchangePassive { err = channel.ExchangeDeclarePassive( - exchangeName, - exchangeType, + a.Exchange, + a.ExchangeType, exchangeDurable, false, // delete when unused false, // internal @@ -385,8 +373,8 @@ func declareExchange( ) } else { err = channel.ExchangeDeclare( - exchangeName, - exchangeType, + a.Exchange, + a.ExchangeType, exchangeDurable, false, // delete when unused false, // internal @@ -400,26 +388,18 @@ func declareExchange( return nil } -func declareQueue( - channel *amqp.Channel, - queueName string, - queueDurability string, - queuePassive bool, -) (*amqp.Queue, error) { +func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error) { var queue amqp.Queue var err error - var queueDurable = true - switch queueDurability { - case "transient": + queueDurable := true + if a.QueueDurability == "transient" { queueDurable = false - default: - queueDurable = true } - if queuePassive { + if a.QueuePassive { queue, err = channel.QueueDeclarePassive( - queueName, // queue + a.Queue, // queue queueDurable, // durable false, // delete when unused false, // exclusive @@ -428,7 +408,7 @@ func declareQueue( ) } else { queue, err = channel.QueueDeclare( - queueName, // queue + a.Queue, // queue queueDurable, // durable false, // delete when unused false, // exclusive diff --git a/plugins/inputs/apache/apache.go b/plugins/inputs/apache/apache.go index 429d1cb9e..9b9059ac8 100644 --- a/plugins/inputs/apache/apache.go +++ b/plugins/inputs/apache/apache.go @@ -158,31 +158,31 @@ func (n *Apache) gatherURL(addr *url.URL, acc telegraf.Accumulator) error { } func (n *Apache) gatherScores(data string) map[string]interface{} { - var waiting, open int = 0, 0 - var S, R, W, K, D, C, L, G, I int = 0, 0, 0, 0, 0, 0, 0, 0, 0 + var waiting, open = 0, 0 + var s, r, w, k, d, c, l, g, i = 0, 0, 0, 0, 0, 0, 0, 0, 0 - for _, s := range strings.Split(data, "") { - switch s { + for _, str := range strings.Split(data, "") { + switch str { case "_": waiting++ case "S": - S++ + s++ case "R": - R++ + r++ case "W": - W++ + w++ case "K": - K++ + k++ case "D": - D++ + d++ case "C": - C++ + c++ case "L": - L++ + l++ case "G": - G++ + g++ case "I": - I++ + i++ case ".": open++ } @@ -190,15 +190,15 @@ func (n *Apache) gatherScores(data string) map[string]interface{} { fields := map[string]interface{}{ "scboard_waiting": float64(waiting), - "scboard_starting": float64(S), - "scboard_reading": float64(R), - "scboard_sending": float64(W), - "scboard_keepalive": float64(K), - "scboard_dnslookup": float64(D), - "scboard_closing": float64(C), - "scboard_logging": float64(L), - "scboard_finishing": float64(G), - "scboard_idle_cleanup": float64(I), + "scboard_starting": float64(s), + "scboard_reading": float64(r), + "scboard_sending": float64(w), + "scboard_keepalive": float64(k), + "scboard_dnslookup": float64(d), + "scboard_closing": float64(c), + "scboard_logging": float64(l), + "scboard_finishing": float64(g), + "scboard_idle_cleanup": float64(i), "scboard_open": float64(open), } return fields diff --git a/plugins/inputs/apcupsd/apcupsd.go b/plugins/inputs/apcupsd/apcupsd.go index b41a91b82..2cb752298 100644 --- a/plugins/inputs/apcupsd/apcupsd.go +++ b/plugins/inputs/apcupsd/apcupsd.go @@ -7,10 +7,11 @@ import ( "strings" "time" + apcupsdClient "github.com/mdlayher/apcupsd" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/mdlayher/apcupsd" ) const defaultAddress = "tcp://127.0.0.1:3551" @@ -42,60 +43,67 @@ func (*ApcUpsd) SampleConfig() string { func (h *ApcUpsd) Gather(acc telegraf.Accumulator) error { ctx := context.Background() - for _, addr := range h.Servers { - addrBits, err := url.Parse(addr) + for _, server := range h.Servers { + err := func(address string) error { + addrBits, err := url.Parse(address) + if err != nil { + return err + } + if addrBits.Scheme == "" { + addrBits.Scheme = "tcp" + } + + ctx, cancel := context.WithTimeout(ctx, time.Duration(h.Timeout)) + defer cancel() + + status, err := fetchStatus(ctx, addrBits) + if err != nil { + return err + } + + tags := map[string]string{ + "serial": status.SerialNumber, + "ups_name": status.UPSName, + "status": status.Status, + "model": status.Model, + } + + flags, err := strconv.ParseUint(strings.Fields(status.StatusFlags)[0], 0, 64) + if err != nil { + return err + } + + fields := map[string]interface{}{ + "status_flags": flags, + "input_voltage": status.LineVoltage, + "load_percent": status.LoadPercent, + "battery_charge_percent": status.BatteryChargePercent, + "time_left_ns": status.TimeLeft.Nanoseconds(), + "output_voltage": status.OutputVoltage, + "internal_temp": status.InternalTemp, + "battery_voltage": status.BatteryVoltage, + "input_frequency": status.LineFrequency, + "time_on_battery_ns": status.TimeOnBattery.Nanoseconds(), + "nominal_input_voltage": status.NominalInputVoltage, + "nominal_battery_voltage": status.NominalBatteryVoltage, + "nominal_power": status.NominalPower, + "firmware": status.Firmware, + "battery_date": status.BatteryDate, + } + + acc.AddFields("apcupsd", fields, tags) + return nil + }(server) + if err != nil { return err } - if addrBits.Scheme == "" { - addrBits.Scheme = "tcp" - } - - ctx, cancel := context.WithTimeout(ctx, time.Duration(h.Timeout)) - defer cancel() - - status, err := fetchStatus(ctx, addrBits) - if err != nil { - return err - } - - tags := map[string]string{ - "serial": status.SerialNumber, - "ups_name": status.UPSName, - "status": status.Status, - "model": status.Model, - } - - flags, err := strconv.ParseUint(strings.Fields(status.StatusFlags)[0], 0, 64) - if err != nil { - return err - } - - fields := map[string]interface{}{ - "status_flags": flags, - "input_voltage": status.LineVoltage, - "load_percent": status.LoadPercent, - "battery_charge_percent": status.BatteryChargePercent, - "time_left_ns": status.TimeLeft.Nanoseconds(), - "output_voltage": status.OutputVoltage, - "internal_temp": status.InternalTemp, - "battery_voltage": status.BatteryVoltage, - "input_frequency": status.LineFrequency, - "time_on_battery_ns": status.TimeOnBattery.Nanoseconds(), - "nominal_input_voltage": status.NominalInputVoltage, - "nominal_battery_voltage": status.NominalBatteryVoltage, - "nominal_power": status.NominalPower, - "firmware": status.Firmware, - "battery_date": status.BatteryDate, - } - - acc.AddFields("apcupsd", fields, tags) } return nil } -func fetchStatus(ctx context.Context, addr *url.URL) (*apcupsd.Status, error) { - client, err := apcupsd.DialContext(ctx, addr.Scheme, addr.Host) +func fetchStatus(ctx context.Context, addr *url.URL) (*apcupsdClient.Status, error) { + client, err := apcupsdClient.DialContext(ctx, addr.Scheme, addr.Host) if err != nil { return nil, err } diff --git a/plugins/inputs/apcupsd/apcupsd_test.go b/plugins/inputs/apcupsd/apcupsd_test.go index dd3c986af..d2baca296 100644 --- a/plugins/inputs/apcupsd/apcupsd_test.go +++ b/plugins/inputs/apcupsd/apcupsd_test.go @@ -7,9 +7,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func TestApcupsdDocs(_ *testing.T) { @@ -35,31 +36,33 @@ func listen(ctx context.Context, t *testing.T, out [][]byte) (string, error) { } go func() { + defer ln.Close() + for ctx.Err() == nil { - defer ln.Close() + func() { + conn, err := ln.Accept() + if err != nil { + return + } + defer conn.Close() + require.NoError(t, conn.SetReadDeadline(time.Now().Add(time.Second))) - conn, err := ln.Accept() - if err != nil { - continue - } - defer conn.Close() - require.NoError(t, conn.SetReadDeadline(time.Now().Add(time.Second))) + in := make([]byte, 128) + n, err := conn.Read(in) + require.NoError(t, err, "failed to read from connection") - in := make([]byte, 128) - n, err := conn.Read(in) - require.NoError(t, err, "failed to read from connection") + status := []byte{0, 6, 's', 't', 'a', 't', 'u', 's'} + want, got := status, in[:n] + require.Equal(t, want, got) - status := []byte{0, 6, 's', 't', 'a', 't', 'u', 's'} - want, got := status, in[:n] - require.Equal(t, want, got) + // Run against test function and append EOF to end of output bytes + out = append(out, []byte{0, 0}) - // Run against test function and append EOF to end of output bytes - out = append(out, []byte{0, 0}) - - for _, o := range out { - _, err := conn.Write(o) - require.NoError(t, err, "failed to write to connection") - } + for _, o := range out { + _, err := conn.Write(o) + require.NoError(t, err, "failed to write to connection") + } + }() } }() @@ -137,9 +140,9 @@ func TestApcupsdGather(t *testing.T) { "time_on_battery_ns": int64(0), "nominal_input_voltage": float64(230), "nominal_battery_voltage": float64(12), - "nominal_power": int(865), - "firmware": string("857.L3 .I USB FW:L3"), - "battery_date": string("2016-09-06"), + "nominal_power": 865, + "firmware": "857.L3 .I USB FW:L3", + "battery_date": "2016-09-06", }, out: genOutput, }, diff --git a/plugins/inputs/beat/README.md b/plugins/inputs/beat/README.md index a3ef9b1b8..d819b5ab9 100644 --- a/plugins/inputs/beat/README.md +++ b/plugins/inputs/beat/README.md @@ -3,7 +3,7 @@ The Beat plugin will collect metrics from the given Beat instances. It is known to work with Filebeat and Kafkabeat. ### Configuration: ```toml - ## An URL from which to read beat-formatted JSON + ## An URL from which to read Beat-formatted JSON ## Default is "http://127.0.0.1:5066". url = "http://127.0.0.1:5066" diff --git a/plugins/inputs/beat/beat.go b/plugins/inputs/beat/beat.go index 2d57a6dea..08b5c3851 100644 --- a/plugins/inputs/beat/beat.go +++ b/plugins/inputs/beat/beat.go @@ -12,7 +12,6 @@ import ( "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" - jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) @@ -55,7 +54,7 @@ const description = "Read metrics exposed by Beat" const suffixInfo = "/" const suffixStats = "/stats" -type BeatInfo struct { +type Info struct { Beat string `json:"beat"` Hostname string `json:"hostname"` Name string `json:"name"` @@ -63,7 +62,7 @@ type BeatInfo struct { Version string `json:"version"` } -type BeatStats struct { +type Stats struct { Beat map[string]interface{} `json:"beat"` FileBeat interface{} `json:"filebeat"` Libbeat interface{} `json:"libbeat"` @@ -140,8 +139,8 @@ func (beat *Beat) createHTTPClient() (*http.Client, error) { } // gatherJSONData query the data source and parse the response JSON -func (beat *Beat) gatherJSONData(url string, value interface{}) error { - request, err := http.NewRequest(beat.Method, url, nil) +func (beat *Beat) gatherJSONData(address string, value interface{}) error { + request, err := http.NewRequest(beat.Method, address, nil) if err != nil { return err } @@ -167,8 +166,8 @@ func (beat *Beat) gatherJSONData(url string, value interface{}) error { } func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { - beatStats := &BeatStats{} - beatInfo := &BeatInfo{} + beatStats := &Stats{} + beatInfo := &Info{} infoURL, err := url.Parse(beat.URL + suffixInfo) if err != nil { diff --git a/plugins/inputs/bind/json_stats.go b/plugins/inputs/bind/json_stats.go index 96a5a9b6e..61307683a 100644 --- a/plugins/inputs/bind/json_stats.go +++ b/plugins/inputs/bind/json_stats.go @@ -64,8 +64,8 @@ func addJSONCounter(acc telegraf.Accumulator, commonTags map[string]string, stat } //Add grouped metrics - for _, metric := range grouper.Metrics() { - acc.AddMetric(metric) + for _, groupedMetric := range grouper.Metrics() { + acc.AddMetric(groupedMetric) } } @@ -144,8 +144,8 @@ func (b *Bind) addStatsJSON(stats jsonStats, acc telegraf.Accumulator, urlTag st } //Add grouped metrics - for _, metric := range grouper.Metrics() { - acc.AddMetric(metric) + for _, groupedMetric := range grouper.Metrics() { + acc.AddMetric(groupedMetric) } } @@ -157,22 +157,30 @@ func (b *Bind) readStatsJSON(addr *url.URL, acc telegraf.Accumulator) error { // Progressively build up full jsonStats struct by parsing the individual HTTP responses for _, suffix := range [...]string{"/server", "/net", "/mem"} { - scrapeURL := addr.String() + suffix + err := func() error { + scrapeURL := addr.String() + suffix + + resp, err := b.client.Get(scrapeURL) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status) + } + + if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { + return fmt.Errorf("unable to decode JSON blob: %s", err) + } + + return nil + }() - resp, err := b.client.Get(scrapeURL) if err != nil { return err } - - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status) - } - - if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { - return fmt.Errorf("Unable to decode JSON blob: %s", err) - } } b.addStatsJSON(stats, acc, addr.Host) diff --git a/plugins/inputs/bind/xml_stats_v2.go b/plugins/inputs/bind/xml_stats_v2.go index ce98b2ddc..5a0092c5a 100644 --- a/plugins/inputs/bind/xml_stats_v2.go +++ b/plugins/inputs/bind/xml_stats_v2.go @@ -81,8 +81,8 @@ func addXMLv2Counter(acc telegraf.Accumulator, commonTags map[string]string, sta } //Add grouped metrics - for _, metric := range grouper.Metrics() { - acc.AddMetric(metric) + for _, groupedMetric := range grouper.Metrics() { + acc.AddMetric(groupedMetric) } } @@ -103,7 +103,7 @@ func (b *Bind) readStatsXMLv2(addr *url.URL, acc telegraf.Accumulator) error { } if err := xml.NewDecoder(resp.Body).Decode(&stats); err != nil { - return fmt.Errorf("Unable to decode XML document: %s", err) + return fmt.Errorf("unable to decode XML document: %s", err) } tags := map[string]string{"url": addr.Host} diff --git a/plugins/inputs/bind/xml_stats_v3.go b/plugins/inputs/bind/xml_stats_v3.go index c4fe7e199..ef303f4bf 100644 --- a/plugins/inputs/bind/xml_stats_v3.go +++ b/plugins/inputs/bind/xml_stats_v3.go @@ -129,8 +129,8 @@ func (b *Bind) addStatsXMLv3(stats v3Stats, acc telegraf.Accumulator, hostPort s } //Add grouped metrics - for _, metric := range grouper.Metrics() { - acc.AddMetric(metric) + for _, groupedMetric := range grouper.Metrics() { + acc.AddMetric(groupedMetric) } } @@ -142,22 +142,30 @@ func (b *Bind) readStatsXMLv3(addr *url.URL, acc telegraf.Accumulator) error { // Progressively build up full v3Stats struct by parsing the individual HTTP responses for _, suffix := range [...]string{"/server", "/net", "/mem"} { - scrapeURL := addr.String() + suffix + err := func() error { + scrapeURL := addr.String() + suffix + + resp, err := b.client.Get(scrapeURL) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status) + } + + if err := xml.NewDecoder(resp.Body).Decode(&stats); err != nil { + return fmt.Errorf("unable to decode XML document: %s", err) + } + + return nil + }() - resp, err := b.client.Get(scrapeURL) if err != nil { return err } - - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("%s returned HTTP status: %s", scrapeURL, resp.Status) - } - - if err := xml.NewDecoder(resp.Body).Decode(&stats); err != nil { - return fmt.Errorf("Unable to decode XML document: %s", err) - } } b.addStatsXMLv3(stats, acc, addr.Host) diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go index de0b56692..d9df7be31 100644 --- a/plugins/inputs/burrow/burrow_test.go +++ b/plugins/inputs/burrow/burrow_test.go @@ -9,8 +9,9 @@ import ( "strings" "testing" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) // remap uri to json file, eg: /v3/kafka -> ./testdata/v3_kafka.json @@ -49,7 +50,7 @@ func getHTTPServerBasicAuth() *httptest.Server { w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`) username, password, authOK := r.BasicAuth() - if authOK == false { + if !authOK { http.Error(w, "Not authorized", 401) return }