diff --git a/plugins/inputs/cassandra/README.md b/plugins/inputs/cassandra/README.md index d89459533..56c36bfe9 100644 --- a/plugins/inputs/cassandra/README.md +++ b/plugins/inputs/cassandra/README.md @@ -19,10 +19,26 @@ Cassandra plugin produces one or more measurements for each metric configured, a Given a configuration like: ```toml +# Read Cassandra metrics through Jolokia [[inputs.cassandra]] + ## DEPRECATED: The cassandra plugin has been deprecated. Please use the + ## jolokia2 plugin instead. + ## + ## see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/jolokia2 + context = "/jolokia/read" - servers = [":8778"] - metrics = ["/java.lang:type=Memory/HeapMemoryUsage"] + ## List of cassandra servers exposing jolokia read service + servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"] + ## List of metrics collected on above servers + ## Each metric consists of a jmx path. + ## This will collect all heap memory usage metrics from the jvm and + ## ReadLatency metrics for all keyspaces and tables. + ## "type=Table" in the query works with Cassandra3.0. Older versions might + ## need to use "type=ColumnFamily" + metrics = [ + "/java.lang:type=Memory/HeapMemoryUsage", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" + ] ``` The collected metrics will be: diff --git a/plugins/inputs/cassandra/cassandra.go b/plugins/inputs/cassandra/cassandra.go index 7f9fe98b2..4a52ef297 100644 --- a/plugins/inputs/cassandra/cassandra.go +++ b/plugins/inputs/cassandra/cassandra.go @@ -49,13 +49,11 @@ type jmxMetric interface { addTagsFields(out map[string]interface{}) } -func newJavaMetric(host string, metric string, - acc telegraf.Accumulator) *javaMetric { +func newJavaMetric(acc telegraf.Accumulator, host string, metric string) *javaMetric { return &javaMetric{host: host, metric: metric, acc: acc} } -func newCassandraMetric(host string, metric string, - acc telegraf.Accumulator) *cassandraMetric { +func newCassandraMetric(acc telegraf.Accumulator, host string, metric string) *cassandraMetric { return &cassandraMetric{host: host, metric: metric, acc: acc} } @@ -72,13 +70,15 @@ func addValuesAsFields(values map[string]interface{}, fields map[string]interfac func parseJmxMetricRequest(mbean string) map[string]string { tokens := make(map[string]string) classAndPairs := strings.Split(mbean, ":") - if classAndPairs[0] == "org.apache.cassandra.metrics" { + switch classAndPairs[0] { + case "org.apache.cassandra.metrics": tokens["class"] = "cassandra" - } else if classAndPairs[0] == "java.lang" { + case "java.lang": tokens["class"] = "java" - } else { + default: return tokens } + pairs := strings.Split(classAndPairs[1], ",") for _, pair := range pairs { p := strings.Split(pair, "=") @@ -147,22 +147,21 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) { // maps in the json response if (tokens["type"] == "Table" || tokens["type"] == "ColumnFamily") && (tokens["keyspace"] == "*" || tokens["scope"] == "*") { - if valuesMap, ok := out["value"]; ok { - for k, v := range valuesMap.(map[string]interface{}) { - addCassandraMetric(k, c, v.(map[string]interface{})) - } - } else { + valuesMap, ok := out["value"] + if !ok { c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out)) return } + for k, v := range valuesMap.(map[string]interface{}) { + addCassandraMetric(k, c, v.(map[string]interface{})) + } } else { - if values, ok := out["value"]; ok { - addCassandraMetric(r.(map[string]interface{})["mbean"].(string), - c, values.(map[string]interface{})) - } else { + values, ok := out["value"] + if !ok { c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out)) return } + addCassandraMetric(r.(map[string]interface{})["mbean"].(string), c, values.(map[string]interface{})) } } @@ -277,10 +276,10 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error { var m jmxMetric if strings.HasPrefix(metric, "/java.lang:") { - m = newJavaMetric(serverTokens["host"], metric, acc) + m = newJavaMetric(acc, serverTokens["host"], metric) } else if strings.HasPrefix(metric, "/org.apache.cassandra.metrics:") { - m = newCassandraMetric(serverTokens["host"], metric, acc) + m = newCassandraMetric(acc, serverTokens["host"], metric) } else { // unsupported metric type acc.AddError(fmt.Errorf("unsupported Cassandra metric [%s], skipping", metric)) diff --git a/plugins/inputs/ceph/README.md b/plugins/inputs/ceph/README.md index 171b64760..dc58adb0f 100644 --- a/plugins/inputs/ceph/README.md +++ b/plugins/inputs/ceph/README.md @@ -45,7 +45,7 @@ the cluster. The currently supported commands are: ### Configuration: ```toml -# Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. +# Collects performance metrics from the MON, OSD, MDS and RGW nodes in a Ceph storage cluster. [[inputs.ceph]] ## This is the recommended interval to poll. Too frequent and you will lose ## data points due to timeouts during rebalancing and recovery diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index 3445b2d12..7baa28213 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" "os/exec" "path/filepath" "strings" @@ -28,17 +27,19 @@ const ( ) type Ceph struct { - CephBinary string - OsdPrefix string - MonPrefix string - MdsPrefix string - RgwPrefix string - SocketDir string - SocketSuffix string - CephUser string - CephConfig string - GatherAdminSocketStats bool - GatherClusterStats bool + CephBinary string `toml:"ceph_binary"` + OsdPrefix string `toml:"osd_prefix"` + MonPrefix string `toml:"mon_prefix"` + MdsPrefix string `toml:"mds_prefix"` + RgwPrefix string `toml:"rgw_prefix"` + SocketDir string `toml:"socket_dir"` + SocketSuffix string `toml:"socket_suffix"` + CephUser string `toml:"ceph_user"` + CephConfig string `toml:"ceph_config"` + GatherAdminSocketStats bool `toml:"gather_admin_socket_stats"` + GatherClusterStats bool `toml:"gather_cluster_stats"` + + Log telegraf.Logger `toml:"-"` } func (c *Ceph) Description() string { @@ -67,7 +68,14 @@ var sampleConfig = ` ## suffix used to identify socket files socket_suffix = "asok" - ## Ceph user to authenticate as + ## Ceph user to authenticate as, ceph will search for the corresponding keyring + ## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the + ## client section of ceph.conf for example: + ## + ## [client.telegraf] + ## keyring = /etc/ceph/client.telegraf.keyring + ## + ## Consult the ceph documentation for more detail on keyring generation. ceph_user = "client.admin" ## Ceph configuration to use to locate the cluster @@ -76,7 +84,8 @@ var sampleConfig = ` ## Whether to gather statistics via the admin socket gather_admin_socket_stats = true - ## Whether to gather statistics via ceph commands + ## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config + ## to be specified gather_cluster_stats = false ` @@ -112,14 +121,14 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error { acc.AddError(fmt.Errorf("error reading from socket '%s': %v", s.socket, err)) continue } - data, err := parseDump(dump) + data, err := c.parseDump(dump) if err != nil { acc.AddError(fmt.Errorf("error parsing dump from socket '%s': %v", s.socket, err)) continue } for tag, metrics := range data { acc.AddFields(measurement, - map[string]interface{}(metrics), + metrics, map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag}) } } @@ -138,7 +147,7 @@ func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error { // For each job, execute against the cluster, parse and accumulate the data points for _, job := range jobs { - output, err := c.exec(job.command) + output, err := c.execute(job.command) if err != nil { return fmt.Errorf("error executing command: %v", err) } @@ -171,15 +180,17 @@ func init() { var perfDump = func(binary string, socket *socket) (string, error) { cmdArgs := []string{"--admin-daemon", socket.socket} - if socket.sockType == typeOsd { + + switch socket.sockType { + case typeOsd: cmdArgs = append(cmdArgs, "perf", "dump") - } else if socket.sockType == typeMon { + case typeMon: cmdArgs = append(cmdArgs, "perfcounters_dump") - } else if socket.sockType == typeMds { + case typeMds: cmdArgs = append(cmdArgs, "perf", "dump") - } else if socket.sockType == typeRgw { + case typeRgw: cmdArgs = append(cmdArgs, "perf", "dump") - } else { + default: return "", fmt.Errorf("ignoring unknown socket type: %s", socket.sockType) } @@ -268,23 +279,23 @@ type taggedMetricMap map[string]metricMap // Parses a raw JSON string into a taggedMetricMap // Delegates the actual parsing to newTaggedMetricMap(..) -func parseDump(dump string) (taggedMetricMap, error) { +func (c *Ceph) parseDump(dump string) (taggedMetricMap, error) { data := make(map[string]interface{}) err := json.Unmarshal([]byte(dump), &data) if err != nil { return nil, fmt.Errorf("failed to parse json: '%s': %v", dump, err) } - return newTaggedMetricMap(data), nil + return c.newTaggedMetricMap(data), nil } // Builds a TaggedMetricMap out of a generic string map. // The top-level key is used as a tag and all sub-keys are flattened into metrics -func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap { +func (c *Ceph) newTaggedMetricMap(data map[string]interface{}) taggedMetricMap { tmm := make(taggedMetricMap) for tag, datapoints := range data { mm := make(metricMap) - for _, m := range flatten(datapoints) { + for _, m := range c.flatten(datapoints) { mm[m.name()] = m.value } tmm[tag] = mm @@ -296,7 +307,7 @@ func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap { // Nested keys are flattened into ordered slices associated with a metric value. // The key slices are treated as stacks, and are expected to be reversed and concatenated // when passed as metrics to the accumulator. (see (*metric).name()) -func flatten(data interface{}) []*metric { +func (c *Ceph) flatten(data interface{}) []*metric { var metrics []*metric switch val := data.(type) { @@ -305,20 +316,20 @@ func flatten(data interface{}) []*metric { case map[string]interface{}: metrics = make([]*metric, 0, len(val)) for k, v := range val { - for _, m := range flatten(v) { + for _, m := range c.flatten(v) { m.pathStack = append(m.pathStack, k) metrics = append(metrics, m) } } default: - log.Printf("I! [inputs.ceph] ignoring unexpected type '%T' for value %v", val, val) + c.Log.Infof("ignoring unexpected type '%T' for value %v", val, val) } return metrics } -// exec executes the 'ceph' command with the supplied arguments, returning JSON formatted output -func (c *Ceph) exec(command string) (string, error) { +// execute executes the 'ceph' command with the supplied arguments, returning JSON formatted output +func (c *Ceph) execute(command string) (string, error) { cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"} cmdArgs = append(cmdArgs, strings.Split(command, " ")...) diff --git a/plugins/inputs/ceph/ceph_test.go b/plugins/inputs/ceph/ceph_test.go index 5cb120e57..a61838bc6 100644 --- a/plugins/inputs/ceph/ceph_test.go +++ b/plugins/inputs/ceph/ceph_test.go @@ -9,8 +9,9 @@ import ( "strings" "testing" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) const ( @@ -29,28 +30,32 @@ func TestParseSockId(t *testing.T) { } func TestParseMonDump(t *testing.T) { - dump, err := parseDump(monPerfDump) + c := &Ceph{Log: testutil.Logger{}} + dump, err := c.parseDump(monPerfDump) require.NoError(t, err) require.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon) require.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon) } func TestParseOsdDump(t *testing.T) { - dump, err := parseDump(osdPerfDump) + c := &Ceph{Log: testutil.Logger{}} + dump, err := c.parseDump(osdPerfDump) require.NoError(t, err) require.InEpsilon(t, 552132.109360000, dump["filestore"]["commitcycle_interval.sum"], epsilon) require.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"]) } func TestParseMdsDump(t *testing.T) { - dump, err := parseDump(mdsPerfDump) + c := &Ceph{Log: testutil.Logger{}} + dump, err := c.parseDump(mdsPerfDump) require.NoError(t, err) require.InEpsilon(t, 2408386.600934982, dump["mds"]["reply_latency.sum"], epsilon) require.Equal(t, float64(0), dump["throttle-write_buf_throttle"]["wait.avgcount"]) } func TestParseRgwDump(t *testing.T) { - dump, err := parseDump(rgwPerfDump) + c := &Ceph{Log: testutil.Logger{}} + dump, err := c.parseDump(rgwPerfDump) require.NoError(t, err) require.InEpsilon(t, 0.002219876, dump["rgw"]["get_initial_lat.sum"], epsilon) require.Equal(t, float64(0), dump["rgw"]["put_initial_lat.avgcount"]) diff --git a/plugins/inputs/cgroup/README.md b/plugins/inputs/cgroup/README.md index 6982517bc..3b755bbd8 100644 --- a/plugins/inputs/cgroup/README.md +++ b/plugins/inputs/cgroup/README.md @@ -44,12 +44,19 @@ All measurements have the following tags: ### Configuration: ```toml +# Read specific statistics per cgroup # [[inputs.cgroup]] + ## Directories in which to look for files, globs are supported. + ## Consider restricting paths to the set of cgroups you really + ## want to monitor if you have a large number of cgroups, to avoid + ## any cardinality issues. # paths = [ - # "/sys/fs/cgroup/memory", # root cgroup - # "/sys/fs/cgroup/memory/child1", # container cgroup - # "/sys/fs/cgroup/memory/child2/*", # all children cgroups under child2, but not child2 itself + # "/sys/fs/cgroup/memory", + # "/sys/fs/cgroup/memory/child1", + # "/sys/fs/cgroup/memory/child2/*", # ] + ## cgroup stat fields, as file names, globs are supported. + ## these file names are appended to each path from above. # files = ["memory.*usage*", "memory.limit_in_bytes"] ``` diff --git a/plugins/inputs/cgroup/cgroup_linux.go b/plugins/inputs/cgroup/cgroup_linux.go index bb38525b7..6ecfd255a 100644 --- a/plugins/inputs/cgroup/cgroup_linux.go +++ b/plugins/inputs/cgroup/cgroup_linux.go @@ -25,7 +25,7 @@ func (g *CGroup) Gather(acc telegraf.Accumulator) error { acc.AddError(dir.err) continue } - if err := g.gatherDir(dir.path, acc); err != nil { + if err := g.gatherDir(acc, dir.path); err != nil { acc.AddError(err) } } @@ -33,7 +33,7 @@ func (g *CGroup) Gather(acc telegraf.Accumulator) error { return nil } -func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { +func (g *CGroup) gatherDir(acc telegraf.Accumulator, dir string) error { fields := make(map[string]interface{}) list := make(chan pathInfo) @@ -72,8 +72,8 @@ type pathInfo struct { err error } -func isDir(path string) (bool, error) { - result, err := os.Stat(path) +func isDir(pathToCheck string) (bool, error) { + result, err := os.Stat(pathToCheck) if err != nil { return false, err } diff --git a/plugins/inputs/chrony/chrony_test.go b/plugins/inputs/chrony/chrony_test.go index 60cb69da7..01f5f458d 100644 --- a/plugins/inputs/chrony/chrony_test.go +++ b/plugins/inputs/chrony/chrony_test.go @@ -51,7 +51,7 @@ func TestGather(t *testing.T) { acc.AssertContainsTaggedFields(t, "chrony", fields, tags) } -// fackeExecCommand is a helper function that mock +// fakeExecCommand is a helper function that mock // the exec.Command call (and call the test binary) func fakeExecCommand(command string, args ...string) *exec.Cmd { cs := []string{"-test.run=TestHelperProcess", "--", command} @@ -103,7 +103,9 @@ Leap status : Not synchronized } else { //nolint:errcheck,revive // test will fail anyway fmt.Fprint(os.Stdout, "command not found") + //nolint:revive // error code is important for this "test" os.Exit(1) } + //nolint:revive // error code is important for this "test" os.Exit(0) } diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go index 20c5362b3..10f1f764c 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -15,15 +15,16 @@ import ( dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" - "github.com/golang/protobuf/proto" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - internaltls "github.com/influxdata/telegraf/plugins/common/tls" - "github.com/influxdata/telegraf/plugins/inputs" + "github.com/golang/protobuf/proto" //nolint:staticcheck // Cannot switch to "google.golang.org/protobuf/proto", "github.com/golang/protobuf/proto" is used by "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" "google.golang.org/grpc" "google.golang.org/grpc/credentials" _ "google.golang.org/grpc/encoding/gzip" // Register GRPC gzip decoder to support compressed telemetry "google.golang.org/grpc/peer" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + internaltls "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" ) const ( @@ -51,15 +52,15 @@ type CiscoTelemetryMDT struct { listener net.Listener // Internal state - aliases map[string]string - dmesFuncs map[string]string - warned map[string]struct{} - extraTags map[string]map[string]struct{} - nxpathMap map[string]map[string]string //per path map - propMap map[string]func(field *telemetry.TelemetryField, value interface{}) interface{} - mutex sync.Mutex - acc telegraf.Accumulator - wg sync.WaitGroup + internalAliases map[string]string + dmesFuncs map[string]string + warned map[string]struct{} + extraTags map[string]map[string]struct{} + nxpathMap map[string]map[string]string //per path map + propMap map[string]func(field *telemetry.TelemetryField, value interface{}) interface{} + mutex sync.Mutex + acc telegraf.Accumulator + wg sync.WaitGroup } type NxPayloadXfromStructure struct { @@ -87,9 +88,9 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { // Invert aliases list c.warned = make(map[string]struct{}) - c.aliases = make(map[string]string, len(c.Aliases)) + c.internalAliases = make(map[string]string, len(c.Aliases)) for alias, encodingPath := range c.Aliases { - c.aliases[encodingPath] = alias + c.internalAliases[encodingPath] = alias } c.initDb() @@ -276,9 +277,9 @@ func (c *CiscoTelemetryMDT) handleTCPClient(conn net.Conn) error { // MdtDialout RPC server method for grpc-dialout transport func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutServer) error { - peer, peerOK := peer.FromContext(stream.Context()) + peerInCtx, peerOK := peer.FromContext(stream.Context()) if peerOK { - c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peer.Addr) + c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr) } var chunkBuffer bytes.Buffer @@ -314,7 +315,7 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS } if peerOK { - c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peer.Addr) + c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr) } return nil @@ -375,8 +376,8 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { } } - for _, metric := range grouper.Metrics() { - c.acc.AddMetric(metric) + for _, groupedMetric := range grouper.Metrics() { + c.acc.AddMetric(groupedMetric) } } @@ -540,7 +541,7 @@ func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, fie if value := decodeValue(field); value != nil { // Do alias lookup, to shorten measurement names measurement := encodingPath - if alias, ok := c.aliases[encodingPath]; ok { + if alias, ok := c.internalAliases[encodingPath]; ok { measurement = alias } else { c.mutex.Lock() diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go index 69b2fd115..745b26dea 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -9,11 +9,12 @@ import ( "testing" dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" - telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" - "github.com/golang/protobuf/proto" - "github.com/influxdata/telegraf/testutil" + telemetryBis "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" + "github.com/golang/protobuf/proto" //nolint:staticcheck // Cannot switch to "google.golang.org/protobuf/proto", "github.com/golang/protobuf/proto" is used by "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" "github.com/stretchr/testify/require" "google.golang.org/grpc" + + "github.com/influxdata/telegraf/testutil" ) func TestHandleTelemetryTwoSimple(t *testing.T) { @@ -23,55 +24,55 @@ func TestHandleTelemetryTwoSimple(t *testing.T) { // error is expected since we are passing in dummy transport require.Error(t, err) - telemetry := &telemetry.Telemetry{ + telemetry := &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "type:model/some/path", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "name", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "str"}, }, { Name: "uint64", - ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 1234}, + ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 1234}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "bool", - ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: true}, + ValueByType: &telemetryBis.TelemetryField_BoolValue{BoolValue: true}, }, }, }, }, }, { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "name", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str2"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "str2"}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "bool", - ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: false}, + ValueByType: &telemetryBis.TelemetryField_BoolValue{BoolValue: false}, }, }, }, @@ -101,26 +102,26 @@ func TestHandleTelemetrySingleNested(t *testing.T) { // error is expected since we are passing in dummy transport require.Error(t, err) - telemetry := &telemetry.Telemetry{ + telemetry := &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "type:model/nested/path", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "nested", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "key", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "level", - ValueByType: &telemetry.TelemetryField_DoubleValue{DoubleValue: 3}, + ValueByType: &telemetryBis.TelemetryField_DoubleValue{DoubleValue: 3}, }, }, }, @@ -130,16 +131,16 @@ func TestHandleTelemetrySingleNested(t *testing.T) { }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "nested", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "value", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "foo", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, @@ -169,49 +170,49 @@ func TestHandleEmbeddedTags(t *testing.T) { // error is expected since we are passing in dummy transport require.Error(t, err) - telemetry := &telemetry.Telemetry{ + telemetry := &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "type:model/extra", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "foo", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "list", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "name", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry1"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "entry1"}, }, { Name: "test", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"}, }, }, }, { Name: "list", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "name", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry2"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "entry2"}, }, { Name: "test", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, @@ -242,57 +243,57 @@ func TestHandleNXAPI(t *testing.T) { // error is expected since we are passing in dummy transport require.Error(t, err) - telemetry := &telemetry.Telemetry{ + telemetry := &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "show nxapi", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "foo", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "TABLE_nxapi", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "ROW_nxapi", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "index", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i1"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "i1"}, }, { Name: "value", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"}, }, }, }, { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "index", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i2"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "i2"}, }, { Name: "value", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, @@ -331,45 +332,45 @@ func TestHandleNXAPIXformNXAPI(t *testing.T) { // error is expected since we are passing in dummy transport require.Error(t, err) - telemetry := &telemetry.Telemetry{ + telemetry := &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "show processes cpu", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "foo", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "TABLE_process_cpu", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "ROW_process_cpu", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "index", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i1"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "i1"}, }, { Name: "value", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"}, }, }, }, @@ -405,57 +406,57 @@ func TestHandleNXXformMulti(t *testing.T) { // error is expected since we are passing in dummy transport require.Error(t, err) - telemetry := &telemetry.Telemetry{ + telemetry := &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "sys/lldp", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "foo", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "fooEntity", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "attributes", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "rn", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "some-rn"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "some-rn"}, }, { Name: "portIdV", - ValueByType: &telemetry.TelemetryField_Uint32Value{Uint32Value: 12}, + ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: 12}, }, { Name: "portDesc", - ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 100}, + ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 100}, }, { Name: "test", - ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 281474976710655}, + ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 281474976710655}, }, { Name: "subscriptionId", - ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 2814749767106551}, + ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 2814749767106551}, }, }, }, @@ -490,45 +491,45 @@ func TestHandleNXDME(t *testing.T) { // error is expected since we are passing in dummy transport require.Error(t, err) - telemetry := &telemetry.Telemetry{ + telemetry := &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "sys/dme", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "foo", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "fooEntity", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "attributes", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "rn", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "some-rn"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "some-rn"}, }, { Name: "value", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"}, }, }, }, @@ -584,30 +585,30 @@ func TestTCPDialoutOverflow(t *testing.T) { require.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000")) } -func mockTelemetryMessage() *telemetry.Telemetry { - return &telemetry.Telemetry{ +func mockTelemetryMessage() *telemetryBis.Telemetry { + return &telemetryBis.Telemetry{ MsgTimestamp: 1543236572000, EncodingPath: "type:model/some/path", - NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, - Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, - DataGpbkv: []*telemetry.TelemetryField{ + NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetryBis.TelemetryField{ { - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "keys", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "name", - ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"}, + ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "str"}, }, }, }, { Name: "content", - Fields: []*telemetry.TelemetryField{ + Fields: []*telemetryBis.TelemetryField{ { Name: "value", - ValueByType: &telemetry.TelemetryField_Sint64Value{Sint64Value: -1}, + ValueByType: &telemetryBis.TelemetryField_Sint64Value{Sint64Value: -1}, }, }, }, diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_util.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_util.go index e9fb4efe0..8f6ea93ea 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_util.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_util.go @@ -139,6 +139,7 @@ func (c *CiscoTelemetryMDT) nxosValueXform(field *telemetry.TelemetryField, valu return nil //Xformation supported is only from String case "float": + //nolint:revive // switch needed for `.(type)` switch val := field.ValueByType.(type) { case *telemetry.TelemetryField_StringValue: if valf, err := strconv.ParseFloat(val.StringValue, 64); err == nil { diff --git a/plugins/inputs/clickhouse/clickhouse.go b/plugins/inputs/clickhouse/clickhouse.go index 4e87431c0..3a46390b4 100644 --- a/plugins/inputs/clickhouse/clickhouse.go +++ b/plugins/inputs/clickhouse/clickhouse.go @@ -24,7 +24,7 @@ var defaultTimeout = 5 * time.Second var sampleConfig = ` ## Username for authorization on ClickHouse server - ## example: username = "default"" + ## example: username = "default" username = "default" ## Password for authorization on ClickHouse server @@ -560,11 +560,11 @@ func (e *clickhouseError) Error() string { return fmt.Sprintf("received error code %d: %s", e.StatusCode, e.body) } -func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error { - q := url.Query() +func (ch *ClickHouse) execQuery(address *url.URL, query string, i interface{}) error { + q := address.Query() q.Set("query", query+" FORMAT JSON") - url.RawQuery = q.Encode() - req, _ := http.NewRequest("GET", url.String(), nil) + address.RawQuery = q.Encode() + req, _ := http.NewRequest("GET", address.String(), nil) if ch.Username != "" { req.Header.Add("X-ClickHouse-User", ch.Username) } diff --git a/plugins/inputs/clickhouse/clickhouse_test.go b/plugins/inputs/clickhouse/clickhouse_test.go index d6dcf4422..6e308b509 100644 --- a/plugins/inputs/clickhouse/clickhouse_test.go +++ b/plugins/inputs/clickhouse/clickhouse_test.go @@ -8,28 +8,28 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" ) func TestClusterIncludeExcludeFilter(t *testing.T) { ch := ClickHouse{} - if assert.Equal(t, "", ch.clusterIncludeExcludeFilter()) { - ch.ClusterExclude = []string{"test_cluster"} - assert.Equal(t, "WHERE cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) + require.Equal(t, "", ch.clusterIncludeExcludeFilter()) + ch.ClusterExclude = []string{"test_cluster"} + require.Equal(t, "WHERE cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) - ch.ClusterExclude = []string{"test_cluster"} - ch.ClusterInclude = []string{"cluster"} - assert.Equal(t, "WHERE cluster IN ('cluster') OR cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) + ch.ClusterExclude = []string{"test_cluster"} + ch.ClusterInclude = []string{"cluster"} + require.Equal(t, "WHERE cluster IN ('cluster') OR cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) - ch.ClusterExclude = []string{} - ch.ClusterInclude = []string{"cluster1", "cluster2"} - assert.Equal(t, "WHERE cluster IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) + ch.ClusterExclude = []string{} + ch.ClusterInclude = []string{"cluster1", "cluster2"} + require.Equal(t, "WHERE cluster IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) - ch.ClusterExclude = []string{"cluster1", "cluster2"} - ch.ClusterInclude = []string{} - assert.Equal(t, "WHERE cluster NOT IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) - } + ch.ClusterExclude = []string{"cluster1", "cluster2"} + ch.ClusterInclude = []string{} + require.Equal(t, "WHERE cluster NOT IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) } func TestChInt64(t *testing.T) { @@ -42,9 +42,9 @@ func TestChInt64(t *testing.T) { } for src, expected := range assets { var v chUInt64 - if err := v.UnmarshalJSON([]byte(src)); assert.NoError(t, err) { - assert.Equal(t, expected, uint64(v)) - } + err := v.UnmarshalJSON([]byte(src)) + require.NoError(t, err) + require.Equal(t, expected, uint64(v)) } } @@ -74,7 +74,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.events"): err := enc.Encode(result{ Data: []struct { @@ -91,7 +91,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.metrics"): err := enc.Encode(result{ Data: []struct { @@ -108,7 +108,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.asynchronous_metrics"): err := enc.Encode(result{ Data: []struct { @@ -125,7 +125,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "zk_exists"): err := enc.Encode(result{ Data: []struct { @@ -136,7 +136,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "zk_root_nodes"): err := enc.Encode(result{ Data: []struct { @@ -147,7 +147,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "replication_queue_exists"): err := enc.Encode(result{ Data: []struct { @@ -158,7 +158,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "replication_too_many_tries_replicas"): err := enc.Encode(result{ Data: []struct { @@ -171,7 +171,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.detached_parts"): err := enc.Encode(result{ Data: []struct { @@ -182,7 +182,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.dictionaries"): err := enc.Encode(result{ Data: []struct { @@ -197,7 +197,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.mutations"): err := enc.Encode(result{ Data: []struct { @@ -212,7 +212,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.disks"): err := enc.Encode(result{ Data: []struct { @@ -229,7 +229,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.processes"): err := enc.Encode(result{ Data: []struct { @@ -258,7 +258,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "text_log_exists"): err := enc.Encode(result{ Data: []struct { @@ -269,7 +269,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "system.text_log"): err := enc.Encode(result{ Data: []struct { @@ -298,7 +298,7 @@ func TestGather(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) } })) ch = &ClickHouse{ @@ -309,7 +309,7 @@ func TestGather(t *testing.T) { acc = &testutil.Accumulator{} ) defer ts.Close() - assert.NoError(t, ch.Gather(acc)) + require.NoError(t, ch.Gather(acc)) acc.AssertContainsTaggedFields(t, "clickhouse_tables", map[string]interface{}{ @@ -451,7 +451,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "replication_queue_exists"): err := enc.Encode(result{ Data: []struct { @@ -462,7 +462,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) case strings.Contains(query, "text_log_exists"): err := enc.Encode(result{ Data: []struct { @@ -473,7 +473,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) } })) ch = &ClickHouse{ @@ -485,7 +485,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) { acc = &testutil.Accumulator{} ) defer ts.Close() - assert.NoError(t, ch.Gather(acc)) + require.NoError(t, ch.Gather(acc)) acc.AssertDoesNotContainMeasurement(t, "clickhouse_zookeeper") acc.AssertDoesNotContainMeasurement(t, "clickhouse_replication_queue") @@ -503,7 +503,7 @@ func TestWrongJSONMarshalling(t *testing.T) { err := enc.Encode(result{ Data: []struct{}{}, }) - assert.NoError(t, err) + require.NoError(t, err) })) ch = &ClickHouse{ Servers: []string{ @@ -514,9 +514,9 @@ func TestWrongJSONMarshalling(t *testing.T) { acc = &testutil.Accumulator{} ) defer ts.Close() - assert.NoError(t, ch.Gather(acc)) + require.NoError(t, ch.Gather(acc)) - assert.Equal(t, 0, len(acc.Metrics)) + require.Equal(t, 0, len(acc.Metrics)) allMeasurements := []string{ "clickhouse_events", "clickhouse_metrics", @@ -531,7 +531,7 @@ func TestWrongJSONMarshalling(t *testing.T) { "clickhouse_processes", "clickhouse_text_log", } - assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) + require.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) } func TestOfflineServer(t *testing.T) { @@ -547,9 +547,9 @@ func TestOfflineServer(t *testing.T) { }, } ) - assert.NoError(t, ch.Gather(acc)) + require.NoError(t, ch.Gather(acc)) - assert.Equal(t, 0, len(acc.Metrics)) + require.Equal(t, 0, len(acc.Metrics)) allMeasurements := []string{ "clickhouse_events", "clickhouse_metrics", @@ -564,7 +564,7 @@ func TestOfflineServer(t *testing.T) { "clickhouse_processes", "clickhouse_text_log", } - assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) + require.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) } func TestAutoDiscovery(t *testing.T) { @@ -574,8 +574,8 @@ func TestAutoDiscovery(t *testing.T) { Data interface{} `json:"data"` } enc := json.NewEncoder(w) - switch query := r.URL.Query().Get("query"); { - case strings.Contains(query, "system.clusters"): + query := r.URL.Query().Get("query") + if strings.Contains(query, "system.clusters") { err := enc.Encode(result{ Data: []struct { Cluster string `json:"test"` @@ -589,7 +589,7 @@ func TestAutoDiscovery(t *testing.T) { }, }, }) - assert.NoError(t, err) + require.NoError(t, err) } })) ch = &ClickHouse{ @@ -602,5 +602,5 @@ func TestAutoDiscovery(t *testing.T) { acc = &testutil.Accumulator{} ) defer ts.Close() - assert.NoError(t, ch.Gather(acc)) + require.NoError(t, ch.Gather(acc)) } diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md index d7c803c8c..a0c175e1e 100644 --- a/plugins/inputs/cloudwatch/README.md +++ b/plugins/inputs/cloudwatch/README.md @@ -16,6 +16,7 @@ API endpoint. In the following order the plugin will attempt to authenticate. ### Configuration: ```toml +# Pull Metric Statistics from Amazon CloudWatch [[inputs.cloudwatch]] ## Amazon Region region = "us-east-1" @@ -101,7 +102,7 @@ API endpoint. In the following order the plugin will attempt to authenticate. # # ## Dimension filters for Metric. All dimensions defined for the metric names # ## must be specified in order to retrieve the metric statistics. - # ## 'value' has wildcard / 'glob' matching support such as `p-*`. + # ## 'value' has wildcard / 'glob' matching support such as 'p-*'. # [[inputs.cloudwatch.metrics.dimensions]] # name = "LoadBalancerName" # value = "p-example" diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index 34088110e..c4df8f9a7 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -10,15 +10,16 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/cloudwatch" + cwClient "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" internalaws "github.com/influxdata/telegraf/config/aws" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/limiter" - "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/common/proxy" + internalMetric "github.com/influxdata/telegraf/metric" + internalProxy "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -36,7 +37,7 @@ type CloudWatch struct { StatisticInclude []string `toml:"statistic_include"` Timeout config.Duration `toml:"timeout"` - proxy.HTTPProxy + internalProxy.HTTPProxy Period config.Duration `toml:"period"` Delay config.Duration `toml:"delay"` @@ -76,12 +77,12 @@ type metricCache struct { ttl time.Duration built time.Time metrics []filteredMetric - queries []*cloudwatch.MetricDataQuery + queries []*cwClient.MetricDataQuery } type cloudwatchClient interface { - ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) - GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) + ListMetrics(*cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error) + GetMetricData(*cwClient.GetMetricDataInput) (*cwClient.GetMetricDataOutput, error) } // SampleConfig returns the default configuration of the Cloudwatch input plugin. @@ -171,7 +172,7 @@ func (c *CloudWatch) SampleConfig() string { # # ## Dimension filters for Metric. All dimensions defined for the metric names # ## must be specified in order to retrieve the metric statistics. - # ## 'value' has wildcard / 'glob' matching support. such as 'p-*'. + # ## 'value' has wildcard / 'glob' matching support such as 'p-*'. # [[inputs.cloudwatch.metrics.dimensions]] # name = "LoadBalancerName" # value = "p-example" @@ -223,11 +224,11 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { wg := sync.WaitGroup{} rLock := sync.Mutex{} - results := []*cloudwatch.MetricDataResult{} + results := []*cwClient.MetricDataResult{} // 500 is the maximum number of metric data queries a `GetMetricData` request can contain. batchSize := 500 - var batches [][]*cloudwatch.MetricDataQuery + var batches [][]*cwClient.MetricDataQuery for batchSize < len(queries) { queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize]) @@ -237,7 +238,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { for i := range batches { wg.Add(1) <-lmtr.C - go func(inm []*cloudwatch.MetricDataQuery) { + go func(inm []*cwClient.MetricDataQuery) { defer wg.Done() result, err := c.gatherMetrics(c.getDataInputs(inm)) if err != nil { @@ -294,7 +295,7 @@ func (c *CloudWatch) initializeCloudWatch() error { } loglevel := aws.LogOff - c.client = cloudwatch.New(configProvider, cfg.WithLogLevel(loglevel)) + c.client = cwClient.New(configProvider, cfg.WithLogLevel(loglevel)) // Initialize regex matchers for each Dimension value. for _, m := range c.Metrics { @@ -312,7 +313,7 @@ func (c *CloudWatch) initializeCloudWatch() error { } type filteredMetric struct { - metrics []*cloudwatch.Metric + metrics []*cwClient.Metric statFilter filter.Filter } @@ -327,17 +328,17 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { // check for provided metric filter if c.Metrics != nil { for _, m := range c.Metrics { - metrics := []*cloudwatch.Metric{} + metrics := []*cwClient.Metric{} if !hasWildcard(m.Dimensions) { - dimensions := make([]*cloudwatch.Dimension, len(m.Dimensions)) + dimensions := make([]*cwClient.Dimension, len(m.Dimensions)) for k, d := range m.Dimensions { - dimensions[k] = &cloudwatch.Dimension{ + dimensions[k] = &cwClient.Dimension{ Name: aws.String(d.Name), Value: aws.String(d.Value), } } for _, name := range m.MetricNames { - metrics = append(metrics, &cloudwatch.Metric{ + metrics = append(metrics, &cwClient.Metric{ Namespace: aws.String(c.Namespace), MetricName: aws.String(name), Dimensions: dimensions, @@ -351,7 +352,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { for _, name := range m.MetricNames { for _, metric := range allMetrics { if isSelected(name, metric, m.Dimensions) { - metrics = append(metrics, &cloudwatch.Metric{ + metrics = append(metrics, &cwClient.Metric{ Namespace: aws.String(c.Namespace), MetricName: aws.String(name), Dimensions: metric.Dimensions, @@ -399,11 +400,11 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { } // fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace. -func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) { - metrics := []*cloudwatch.Metric{} +func (c *CloudWatch) fetchNamespaceMetrics() ([]*cwClient.Metric, error) { + metrics := []*cwClient.Metric{} var token *string - var params *cloudwatch.ListMetricsInput + var params *cwClient.ListMetricsInput var recentlyActive *string switch c.RecentlyActive { @@ -412,9 +413,9 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) { default: recentlyActive = nil } - params = &cloudwatch.ListMetricsInput{ + params = &cwClient.ListMetricsInput{ Namespace: aws.String(c.Namespace), - Dimensions: []*cloudwatch.DimensionFilter{}, + Dimensions: []*cwClient.DimensionFilter{}, NextToken: token, MetricName: nil, RecentlyActive: recentlyActive, @@ -451,75 +452,75 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) { } // getDataQueries gets all of the possible queries so we can maximize the request payload. -func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cloudwatch.MetricDataQuery { +func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClient.MetricDataQuery { if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() { return c.metricCache.queries } c.queryDimensions = map[string]*map[string]string{} - dataQueries := []*cloudwatch.MetricDataQuery{} + dataQueries := []*cwClient.MetricDataQuery{} for i, filtered := range filteredMetrics { for j, metric := range filtered.metrics { id := strconv.Itoa(j) + "_" + strconv.Itoa(i) dimension := ctod(metric.Dimensions) if filtered.statFilter.Match("average") { c.queryDimensions["average_"+id] = dimension - dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ Id: aws.String("average_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_average")), - MetricStat: &cloudwatch.MetricStat{ + MetricStat: &cwClient.MetricStat{ Metric: metric, Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), - Stat: aws.String(cloudwatch.StatisticAverage), + Stat: aws.String(cwClient.StatisticAverage), }, }) } if filtered.statFilter.Match("maximum") { c.queryDimensions["maximum_"+id] = dimension - dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ Id: aws.String("maximum_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_maximum")), - MetricStat: &cloudwatch.MetricStat{ + MetricStat: &cwClient.MetricStat{ Metric: metric, Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), - Stat: aws.String(cloudwatch.StatisticMaximum), + Stat: aws.String(cwClient.StatisticMaximum), }, }) } if filtered.statFilter.Match("minimum") { c.queryDimensions["minimum_"+id] = dimension - dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ Id: aws.String("minimum_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_minimum")), - MetricStat: &cloudwatch.MetricStat{ + MetricStat: &cwClient.MetricStat{ Metric: metric, Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), - Stat: aws.String(cloudwatch.StatisticMinimum), + Stat: aws.String(cwClient.StatisticMinimum), }, }) } if filtered.statFilter.Match("sum") { c.queryDimensions["sum_"+id] = dimension - dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ Id: aws.String("sum_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_sum")), - MetricStat: &cloudwatch.MetricStat{ + MetricStat: &cwClient.MetricStat{ Metric: metric, Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), - Stat: aws.String(cloudwatch.StatisticSum), + Stat: aws.String(cwClient.StatisticSum), }, }) } if filtered.statFilter.Match("sample_count") { c.queryDimensions["sample_count_"+id] = dimension - dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ Id: aws.String("sample_count_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")), - MetricStat: &cloudwatch.MetricStat{ + MetricStat: &cwClient.MetricStat{ Metric: metric, Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), - Stat: aws.String(cloudwatch.StatisticSampleCount), + Stat: aws.String(cwClient.StatisticSampleCount), }, }) } @@ -546,9 +547,9 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cloudwa // gatherMetrics gets metric data from Cloudwatch. func (c *CloudWatch) gatherMetrics( - params *cloudwatch.GetMetricDataInput, -) ([]*cloudwatch.MetricDataResult, error) { - results := []*cloudwatch.MetricDataResult{} + params *cwClient.GetMetricDataInput, +) ([]*cwClient.MetricDataResult, error) { + results := []*cwClient.MetricDataResult{} for { resp, err := c.client.GetMetricData(params) @@ -568,10 +569,10 @@ func (c *CloudWatch) gatherMetrics( func (c *CloudWatch) aggregateMetrics( acc telegraf.Accumulator, - metricDataResults []*cloudwatch.MetricDataResult, + metricDataResults []*cwClient.MetricDataResult, ) error { var ( - grouper = metric.NewSeriesGrouper() + grouper = internalMetric.NewSeriesGrouper() namespace = sanitizeMeasurement(c.Namespace) ) @@ -626,7 +627,7 @@ func snakeCase(s string) string { } // ctod converts cloudwatch dimensions to regular dimensions. -func ctod(cDimensions []*cloudwatch.Dimension) *map[string]string { +func ctod(cDimensions []*cwClient.Dimension) *map[string]string { dimensions := map[string]string{} for i := range cDimensions { dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value @@ -634,8 +635,8 @@ func ctod(cDimensions []*cloudwatch.Dimension) *map[string]string { return &dimensions } -func (c *CloudWatch) getDataInputs(dataQueries []*cloudwatch.MetricDataQuery) *cloudwatch.GetMetricDataInput { - return &cloudwatch.GetMetricDataInput{ +func (c *CloudWatch) getDataInputs(dataQueries []*cwClient.MetricDataQuery) *cwClient.GetMetricDataInput { + return &cwClient.GetMetricDataInput{ StartTime: aws.Time(c.windowStart), EndTime: aws.Time(c.windowEnd), MetricDataQueries: dataQueries, @@ -656,7 +657,7 @@ func hasWildcard(dimensions []*Dimension) bool { return false } -func isSelected(name string, metric *cloudwatch.Metric, dimensions []*Dimension) bool { +func isSelected(name string, metric *cwClient.Metric, dimensions []*Dimension) bool { if name != *metric.MetricName { return false } diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 158f29a1b..ccd27ec22 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -6,8 +6,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/cloudwatch" - "github.com/stretchr/testify/assert" + cwClient "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/config" @@ -18,13 +17,13 @@ import ( type mockGatherCloudWatchClient struct{} -func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) { - return &cloudwatch.ListMetricsOutput{ - Metrics: []*cloudwatch.Metric{ +func (m *mockGatherCloudWatchClient) ListMetrics(params *cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error) { + return &cwClient.ListMetricsOutput{ + Metrics: []*cwClient.Metric{ { Namespace: params.Namespace, MetricName: aws.String("Latency"), - Dimensions: []*cloudwatch.Dimension{ + Dimensions: []*cwClient.Dimension{ { Name: aws.String("LoadBalancerName"), Value: aws.String("p-example"), @@ -35,9 +34,9 @@ func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsI }, nil } -func (m *mockGatherCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) { - return &cloudwatch.GetMetricDataOutput{ - MetricDataResults: []*cloudwatch.MetricDataResult{ +func (m *mockGatherCloudWatchClient) GetMetricData(params *cwClient.GetMetricDataInput) (*cwClient.GetMetricDataOutput, error) { + return &cwClient.GetMetricDataOutput{ + MetricDataResults: []*cwClient.MetricDataResult{ { Id: aws.String("minimum_0_0"), Label: aws.String("latency_minimum"), @@ -98,8 +97,8 @@ func (m *mockGatherCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricD } func TestSnakeCase(t *testing.T) { - assert.Equal(t, "cluster_name", snakeCase("Cluster Name")) - assert.Equal(t, "broker_id", snakeCase("Broker ID")) + require.Equal(t, "cluster_name", snakeCase("Cluster Name")) + require.Equal(t, "broker_id", snakeCase("Broker ID")) } func TestGather(t *testing.T) { @@ -116,7 +115,7 @@ func TestGather(t *testing.T) { var acc testutil.Accumulator c.client = &mockGatherCloudWatchClient{} - assert.NoError(t, acc.GatherError(c.Gather)) + require.NoError(t, acc.GatherError(c.Gather)) fields := map[string]interface{}{} fields["latency_minimum"] = 0.1 @@ -129,14 +128,14 @@ func TestGather(t *testing.T) { tags["region"] = "us-east-1" tags["load_balancer_name"] = "p-example" - assert.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) + require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) } type mockSelectMetricsCloudWatchClient struct{} -func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) { - metrics := []*cloudwatch.Metric{} +func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error) { + metrics := []*cwClient.Metric{} // 4 metrics are available metricNames := []string{"Latency", "RequestCount", "HealthyHostCount", "UnHealthyHostCount"} // for 3 ELBs @@ -146,10 +145,10 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetric for _, m := range metricNames { for _, lb := range loadBalancers { // For each metric/ELB pair, we get an aggregate value across all AZs. - metrics = append(metrics, &cloudwatch.Metric{ + metrics = append(metrics, &cwClient.Metric{ Namespace: aws.String("AWS/ELB"), MetricName: aws.String(m), - Dimensions: []*cloudwatch.Dimension{ + Dimensions: []*cwClient.Dimension{ { Name: aws.String("LoadBalancerName"), Value: aws.String(lb), @@ -158,10 +157,10 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetric }) for _, az := range availabilityZones { // We get a metric for each metric/ELB/AZ triplet. - metrics = append(metrics, &cloudwatch.Metric{ + metrics = append(metrics, &cwClient.Metric{ Namespace: aws.String("AWS/ELB"), MetricName: aws.String(m), - Dimensions: []*cloudwatch.Dimension{ + Dimensions: []*cwClient.Dimension{ { Name: aws.String("LoadBalancerName"), Value: aws.String(lb), @@ -176,13 +175,13 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetric } } - result := &cloudwatch.ListMetricsOutput{ + result := &cwClient.ListMetricsOutput{ Metrics: metrics, } return result, nil } -func (m *mockSelectMetricsCloudWatchClient) GetMetricData(_ *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) { +func (m *mockSelectMetricsCloudWatchClient) GetMetricData(_ *cwClient.GetMetricDataInput) (*cwClient.GetMetricDataOutput, error) { return nil, nil } @@ -212,24 +211,24 @@ func TestSelectMetrics(t *testing.T) { }, } err := c.initializeCloudWatch() - assert.NoError(t, err) + require.NoError(t, err) c.client = &mockSelectMetricsCloudWatchClient{} filtered, err := getFilteredMetrics(c) // We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2 // AZs. We should get 12 metrics. - assert.Equal(t, 12, len(filtered[0].metrics)) - assert.NoError(t, err) + require.Equal(t, 12, len(filtered[0].metrics)) + require.NoError(t, err) } func TestGenerateStatisticsInputParams(t *testing.T) { - d := &cloudwatch.Dimension{ + d := &cwClient.Dimension{ Name: aws.String("LoadBalancerName"), Value: aws.String("p-example"), } - m := &cloudwatch.Metric{ + m := &cwClient.Metric{ MetricName: aws.String("Latency"), - Dimensions: []*cloudwatch.Dimension{d}, + Dimensions: []*cwClient.Dimension{d}, } duration, _ := time.ParseDuration("1m") @@ -248,25 +247,25 @@ func TestGenerateStatisticsInputParams(t *testing.T) { c.updateWindow(now) statFilter, _ := filter.NewIncludeExcludeFilter(nil, nil) - queries := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}}) + queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}}) params := c.getDataInputs(queries) - assert.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) - assert.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) + require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) + require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) require.Len(t, params.MetricDataQueries, 5) - assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) - assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) + require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) + require.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) } func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { - d := &cloudwatch.Dimension{ + d := &cwClient.Dimension{ Name: aws.String("LoadBalancerName"), Value: aws.String("p-example"), } - m := &cloudwatch.Metric{ + m := &cwClient.Metric{ MetricName: aws.String("Latency"), - Dimensions: []*cloudwatch.Dimension{d}, + Dimensions: []*cwClient.Dimension{d}, } duration, _ := time.ParseDuration("1m") @@ -285,14 +284,14 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { c.updateWindow(now) statFilter, _ := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil) - queries := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}}) + queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}}) params := c.getDataInputs(queries) - assert.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) - assert.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) + require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) + require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) require.Len(t, params.MetricDataQueries, 2) - assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) - assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) + require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) + require.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) } func TestMetricsCacheTimeout(t *testing.T) { @@ -302,9 +301,9 @@ func TestMetricsCacheTimeout(t *testing.T) { ttl: time.Minute, } - assert.True(t, cache.isValid()) + require.True(t, cache.isValid()) cache.built = time.Now().Add(-time.Minute) - assert.False(t, cache.isValid()) + require.False(t, cache.isValid()) } func TestUpdateWindow(t *testing.T) { @@ -319,23 +318,23 @@ func TestUpdateWindow(t *testing.T) { now := time.Now() - assert.True(t, c.windowEnd.IsZero()) - assert.True(t, c.windowStart.IsZero()) + require.True(t, c.windowEnd.IsZero()) + require.True(t, c.windowStart.IsZero()) c.updateWindow(now) newStartTime := c.windowEnd // initial window just has a single period - assert.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) - assert.EqualValues(t, c.windowStart, now.Add(-time.Duration(c.Delay)).Add(-time.Duration(c.Period))) + require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) + require.EqualValues(t, c.windowStart, now.Add(-time.Duration(c.Delay)).Add(-time.Duration(c.Period))) now = time.Now() c.updateWindow(now) // subsequent window uses previous end time as start time - assert.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) - assert.EqualValues(t, c.windowStart, newStartTime) + require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) + require.EqualValues(t, c.windowStart, newStartTime) } func TestProxyFunction(t *testing.T) { diff --git a/plugins/inputs/couchbase/couchbase.go b/plugins/inputs/couchbase/couchbase.go index ef66cb8d1..e89393ee8 100644 --- a/plugins/inputs/couchbase/couchbase.go +++ b/plugins/inputs/couchbase/couchbase.go @@ -7,7 +7,8 @@ import ( "sync" "time" - couchbase "github.com/couchbase/go-couchbase" + couchbaseClient "github.com/couchbase/go-couchbase" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/plugins/inputs" @@ -33,7 +34,7 @@ var sampleConfig = ` ## If no port is specified, 8091 is used. servers = ["http://localhost:8091"] - ## Filter fields to include only here. + ## Filter bucket fields to include only here. # bucket_stats_included = ["quota_percent_used", "ops_per_sec", "disk_fetches", "item_count", "disk_used", "data_used", "mem_used"] ` @@ -45,14 +46,14 @@ func (cb *Couchbase) SampleConfig() string { } func (cb *Couchbase) Description() string { - return "Read metrics from one or many couchbase clusters" + return "Read per-node and per-bucket metrics from Couchbase" } // Reads stats from all configured clusters. Accumulates stats. // Returns one of the errors encountered while gathering stats (if any). func (cb *Couchbase) Gather(acc telegraf.Accumulator) error { if len(cb.Servers) == 0 { - return cb.gatherServer("http://localhost:8091/", acc, nil) + return cb.gatherServer(acc, "http://localhost:8091/", nil) } var wg sync.WaitGroup @@ -60,7 +61,7 @@ func (cb *Couchbase) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(serv string) { defer wg.Done() - acc.AddError(cb.gatherServer(serv, acc, nil)) + acc.AddError(cb.gatherServer(acc, serv, nil)) }(serv) } @@ -69,9 +70,9 @@ func (cb *Couchbase) Gather(acc telegraf.Accumulator) error { return nil } -func (cb *Couchbase) gatherServer(addr string, acc telegraf.Accumulator, pool *couchbase.Pool) error { +func (cb *Couchbase) gatherServer(acc telegraf.Accumulator, addr string, pool *couchbaseClient.Pool) error { if pool == nil { - client, err := couchbase.Connect(addr) + client, err := couchbaseClient.Connect(addr) if err != nil { return err } diff --git a/plugins/inputs/couchbase/couchbase_test.go b/plugins/inputs/couchbase/couchbase_test.go index 25728544c..3b927e8c4 100644 --- a/plugins/inputs/couchbase/couchbase_test.go +++ b/plugins/inputs/couchbase/couchbase_test.go @@ -43,7 +43,7 @@ func TestGatherServer(t *testing.T) { require.NoError(t, err) var acc testutil.Accumulator - err = cb.gatherServer(fakeServer.URL, &acc, &pool) + err = cb.gatherServer(&acc, fakeServer.URL, &pool) require.NoError(t, err) acc.AssertContainsTaggedFields(t, "couchbase_node", map[string]interface{}{"memory_free": 23181365248.0, "memory_total": 64424656896.0}, diff --git a/plugins/inputs/cpu/README.md b/plugins/inputs/cpu/README.md index bc86ae898..8e2ef66f9 100644 --- a/plugins/inputs/cpu/README.md +++ b/plugins/inputs/cpu/README.md @@ -4,14 +4,15 @@ The `cpu` plugin gather metrics on the system CPUs. #### Configuration ```toml +# Read metrics about cpu usage [[inputs.cpu]] ## Whether to report per-cpu stats or not percpu = true ## Whether to report total system cpu stats or not totalcpu = true - ## If true, collect raw CPU time metrics. + ## If true, collect raw CPU time metrics collect_cpu_time = false - ## If true, compute and report the sum of all non-idle CPU states. + ## If true, compute and report the sum of all non-idle CPU states report_active = false ``` diff --git a/plugins/inputs/cpu/cpu.go b/plugins/inputs/cpu/cpu.go index 3fcdb3db4..9e795c82a 100644 --- a/plugins/inputs/cpu/cpu.go +++ b/plugins/inputs/cpu/cpu.go @@ -4,15 +4,16 @@ import ( "fmt" "time" + cpuUtil "github.com/shirou/gopsutil/cpu" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/system" - "github.com/shirou/gopsutil/cpu" ) type CPUStats struct { ps system.PS - lastStats map[string]cpu.TimesStat + lastStats map[string]cpuUtil.TimesStat PerCPU bool `toml:"percpu"` TotalCPU bool `toml:"totalcpu"` @@ -123,7 +124,7 @@ func (c *CPUStats) Gather(acc telegraf.Accumulator) error { acc.AddGauge("cpu", fieldsG, tags, now) } - c.lastStats = make(map[string]cpu.TimesStat) + c.lastStats = make(map[string]cpuUtil.TimesStat) for _, cts := range times { c.lastStats[cts.CPU] = cts } @@ -131,12 +132,12 @@ func (c *CPUStats) Gather(acc telegraf.Accumulator) error { return err } -func totalCPUTime(t cpu.TimesStat) float64 { +func totalCPUTime(t cpuUtil.TimesStat) float64 { total := t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + t.Idle return total } -func activeCPUTime(t cpu.TimesStat) float64 { +func activeCPUTime(t cpuUtil.TimesStat) float64 { active := totalCPUTime(t) - t.Idle return active } diff --git a/plugins/inputs/cpu/cpu_test.go b/plugins/inputs/cpu/cpu_test.go index d3849a519..e51660a0a 100644 --- a/plugins/inputs/cpu/cpu_test.go +++ b/plugins/inputs/cpu/cpu_test.go @@ -4,11 +4,11 @@ import ( "fmt" "testing" + cpuUtil "github.com/shirou/gopsutil/cpu" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/plugins/inputs/system" "github.com/influxdata/telegraf/testutil" - "github.com/shirou/gopsutil/cpu" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestCPUStats(t *testing.T) { @@ -16,7 +16,7 @@ func TestCPUStats(t *testing.T) { defer mps.AssertExpectations(t) var acc testutil.Accumulator - cts := cpu.TimesStat{ + cts := cpuUtil.TimesStat{ CPU: "cpu0", User: 8.8, System: 8.2, @@ -30,7 +30,7 @@ func TestCPUStats(t *testing.T) { GuestNice: 0.324, } - cts2 := cpu.TimesStat{ + cts2 := cpuUtil.TimesStat{ CPU: "cpu0", User: 24.9, // increased by 16.1 System: 10.9, // increased by 2.7 @@ -44,7 +44,7 @@ func TestCPUStats(t *testing.T) { GuestNice: 2.524, // increased by 2.2 } - mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) + mps.On("CPUTimes").Return([]cpuUtil.TimesStat{cts}, nil) cs := NewCPUStats(&mps) @@ -66,7 +66,7 @@ func TestCPUStats(t *testing.T) { assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0) mps2 := system.MockPS{} - mps2.On("CPUTimes").Return([]cpu.TimesStat{cts2}, nil) + mps2.On("CPUTimes").Return([]cpuUtil.TimesStat{cts2}, nil) cs.ps = &mps2 // Should have added cpu percentages too @@ -131,8 +131,7 @@ func assertContainsTaggedFloat( return } } else { - assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64", - measurement)) + require.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64", measurement)) } } } @@ -141,7 +140,7 @@ func assertContainsTaggedFloat( msg := fmt.Sprintf( "Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f", measurement, delta, expectedValue, actualValue) - assert.Fail(t, msg) + require.Fail(t, msg) } // TestCPUCountChange tests that no errors are encountered if the number of @@ -155,7 +154,7 @@ func TestCPUCountIncrease(t *testing.T) { cs := NewCPUStats(&mps) mps.On("CPUTimes").Return( - []cpu.TimesStat{ + []cpuUtil.TimesStat{ { CPU: "cpu0", }, @@ -165,7 +164,7 @@ func TestCPUCountIncrease(t *testing.T) { require.NoError(t, err) mps2.On("CPUTimes").Return( - []cpu.TimesStat{ + []cpuUtil.TimesStat{ { CPU: "cpu0", }, @@ -186,28 +185,28 @@ func TestCPUTimesDecrease(t *testing.T) { defer mps.AssertExpectations(t) var acc testutil.Accumulator - cts := cpu.TimesStat{ + cts := cpuUtil.TimesStat{ CPU: "cpu0", User: 18, Idle: 80, Iowait: 2, } - cts2 := cpu.TimesStat{ + cts2 := cpuUtil.TimesStat{ CPU: "cpu0", User: 38, // increased by 20 Idle: 40, // decreased by 40 Iowait: 1, // decreased by 1 } - cts3 := cpu.TimesStat{ + cts3 := cpuUtil.TimesStat{ CPU: "cpu0", User: 56, // increased by 18 Idle: 120, // increased by 80 Iowait: 3, // increased by 2 } - mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) + mps.On("CPUTimes").Return([]cpuUtil.TimesStat{cts}, nil) cs := NewCPUStats(&mps) @@ -221,7 +220,7 @@ func TestCPUTimesDecrease(t *testing.T) { assertContainsTaggedFloat(t, &acc, "time_iowait", 2, 0) mps2 := system.MockPS{} - mps2.On("CPUTimes").Return([]cpu.TimesStat{cts2}, nil) + mps2.On("CPUTimes").Return([]cpuUtil.TimesStat{cts2}, nil) cs.ps = &mps2 // CPU times decreased. An error should be raised @@ -229,7 +228,7 @@ func TestCPUTimesDecrease(t *testing.T) { require.Error(t, err) mps3 := system.MockPS{} - mps3.On("CPUTimes").Return([]cpu.TimesStat{cts3}, nil) + mps3.On("CPUTimes").Return([]cpuUtil.TimesStat{cts3}, nil) cs.ps = &mps3 err = cs.Gather(&acc) diff --git a/plugins/inputs/csgo/README.md b/plugins/inputs/csgo/README.md index dbf3f3fdf..b33550940 100644 --- a/plugins/inputs/csgo/README.md +++ b/plugins/inputs/csgo/README.md @@ -4,6 +4,7 @@ The `csgo` plugin gather metrics from Counter-Strike: Global Offensive servers. #### Configuration ```toml +# Fetch metrics from a CSGO SRCDS [[inputs.csgo]] ## Specify servers using the following format: ## servers = [ diff --git a/plugins/inputs/csgo/csgo.go b/plugins/inputs/csgo/csgo.go index 75cf8a924..59d1110ad 100644 --- a/plugins/inputs/csgo/csgo.go +++ b/plugins/inputs/csgo/csgo.go @@ -8,9 +8,10 @@ import ( "sync" "time" + "github.com/james4k/rcon" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/james4k/rcon" ) type statsData struct { @@ -30,7 +31,7 @@ type CSGO struct { Servers [][]string `toml:"servers"` } -func (_ *CSGO) Description() string { +func (*CSGO) Description() string { return "Fetch metrics from a CSGO SRCDS" } @@ -45,7 +46,7 @@ var sampleConfig = ` servers = [] ` -func (_ *CSGO) SampleConfig() string { +func (*CSGO) SampleConfig() string { return sampleConfig } @@ -57,7 +58,7 @@ func (s *CSGO) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(ss []string) { defer wg.Done() - acc.AddError(s.gatherServer(ss, requestServer, acc)) + acc.AddError(s.gatherServer(acc, ss, requestServer)) }(server) } @@ -72,9 +73,9 @@ func init() { } func (s *CSGO) gatherServer( + acc telegraf.Accumulator, server []string, request func(string, string) (string, error), - acc telegraf.Accumulator, ) error { if len(server) != 2 { return errors.New("incorrect server config") diff --git a/plugins/inputs/csgo/csgo_test.go b/plugins/inputs/csgo/csgo_test.go index 311e4b2b6..b1d1c9b69 100644 --- a/plugins/inputs/csgo/csgo_test.go +++ b/plugins/inputs/csgo/csgo_test.go @@ -19,7 +19,7 @@ var ( func TestCPUStats(t *testing.T) { c := NewCSGOStats() var acc testutil.Accumulator - err := c.gatherServer(c.Servers[0], requestMock, &acc) + err := c.gatherServer(&acc, c.Servers[0], requestMock) if err != nil { t.Error(err) }