Linter fixes for plugins/inputs/[c]* (#9194)

* Linter fixes for plugins/inputs/[c]*

* Linter fixes for plugins/inputs/[c]*

Co-authored-by: Pawel Zak <Pawel Zak>
This commit is contained in:
Paweł Żak 2021-04-28 03:41:52 +02:00 committed by GitHub
parent 5256f916eb
commit 1fabc5f1fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 446 additions and 398 deletions

View File

@ -19,10 +19,26 @@ Cassandra plugin produces one or more measurements for each metric configured, a
Given a configuration like: Given a configuration like:
```toml ```toml
# Read Cassandra metrics through Jolokia
[[inputs.cassandra]] [[inputs.cassandra]]
## DEPRECATED: The cassandra plugin has been deprecated. Please use the
## jolokia2 plugin instead.
##
## see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/jolokia2
context = "/jolokia/read" context = "/jolokia/read"
servers = [":8778"] ## List of cassandra servers exposing jolokia read service
metrics = ["/java.lang:type=Memory/HeapMemoryUsage"] servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"]
## List of metrics collected on above servers
## Each metric consists of a jmx path.
## This will collect all heap memory usage metrics from the jvm and
## ReadLatency metrics for all keyspaces and tables.
## "type=Table" in the query works with Cassandra3.0. Older versions might
## need to use "type=ColumnFamily"
metrics = [
"/java.lang:type=Memory/HeapMemoryUsage",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
]
``` ```
The collected metrics will be: The collected metrics will be:

View File

@ -49,13 +49,11 @@ type jmxMetric interface {
addTagsFields(out map[string]interface{}) addTagsFields(out map[string]interface{})
} }
func newJavaMetric(host string, metric string, func newJavaMetric(acc telegraf.Accumulator, host string, metric string) *javaMetric {
acc telegraf.Accumulator) *javaMetric {
return &javaMetric{host: host, metric: metric, acc: acc} return &javaMetric{host: host, metric: metric, acc: acc}
} }
func newCassandraMetric(host string, metric string, func newCassandraMetric(acc telegraf.Accumulator, host string, metric string) *cassandraMetric {
acc telegraf.Accumulator) *cassandraMetric {
return &cassandraMetric{host: host, metric: metric, acc: acc} return &cassandraMetric{host: host, metric: metric, acc: acc}
} }
@ -72,13 +70,15 @@ func addValuesAsFields(values map[string]interface{}, fields map[string]interfac
func parseJmxMetricRequest(mbean string) map[string]string { func parseJmxMetricRequest(mbean string) map[string]string {
tokens := make(map[string]string) tokens := make(map[string]string)
classAndPairs := strings.Split(mbean, ":") classAndPairs := strings.Split(mbean, ":")
if classAndPairs[0] == "org.apache.cassandra.metrics" { switch classAndPairs[0] {
case "org.apache.cassandra.metrics":
tokens["class"] = "cassandra" tokens["class"] = "cassandra"
} else if classAndPairs[0] == "java.lang" { case "java.lang":
tokens["class"] = "java" tokens["class"] = "java"
} else { default:
return tokens return tokens
} }
pairs := strings.Split(classAndPairs[1], ",") pairs := strings.Split(classAndPairs[1], ",")
for _, pair := range pairs { for _, pair := range pairs {
p := strings.Split(pair, "=") p := strings.Split(pair, "=")
@ -147,22 +147,21 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
// maps in the json response // maps in the json response
if (tokens["type"] == "Table" || tokens["type"] == "ColumnFamily") && (tokens["keyspace"] == "*" || if (tokens["type"] == "Table" || tokens["type"] == "ColumnFamily") && (tokens["keyspace"] == "*" ||
tokens["scope"] == "*") { tokens["scope"] == "*") {
if valuesMap, ok := out["value"]; ok { valuesMap, ok := out["value"]
for k, v := range valuesMap.(map[string]interface{}) { if !ok {
addCassandraMetric(k, c, v.(map[string]interface{}))
}
} else {
c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out)) c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out))
return return
} }
for k, v := range valuesMap.(map[string]interface{}) {
addCassandraMetric(k, c, v.(map[string]interface{}))
}
} else { } else {
if values, ok := out["value"]; ok { values, ok := out["value"]
addCassandraMetric(r.(map[string]interface{})["mbean"].(string), if !ok {
c, values.(map[string]interface{}))
} else {
c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out)) c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out))
return return
} }
addCassandraMetric(r.(map[string]interface{})["mbean"].(string), c, values.(map[string]interface{}))
} }
} }
@ -277,10 +276,10 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
var m jmxMetric var m jmxMetric
if strings.HasPrefix(metric, "/java.lang:") { if strings.HasPrefix(metric, "/java.lang:") {
m = newJavaMetric(serverTokens["host"], metric, acc) m = newJavaMetric(acc, serverTokens["host"], metric)
} else if strings.HasPrefix(metric, } else if strings.HasPrefix(metric,
"/org.apache.cassandra.metrics:") { "/org.apache.cassandra.metrics:") {
m = newCassandraMetric(serverTokens["host"], metric, acc) m = newCassandraMetric(acc, serverTokens["host"], metric)
} else { } else {
// unsupported metric type // unsupported metric type
acc.AddError(fmt.Errorf("unsupported Cassandra metric [%s], skipping", metric)) acc.AddError(fmt.Errorf("unsupported Cassandra metric [%s], skipping", metric))

View File

@ -45,7 +45,7 @@ the cluster. The currently supported commands are:
### Configuration: ### Configuration:
```toml ```toml
# Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. # Collects performance metrics from the MON, OSD, MDS and RGW nodes in a Ceph storage cluster.
[[inputs.ceph]] [[inputs.ceph]]
## This is the recommended interval to poll. Too frequent and you will lose ## This is the recommended interval to poll. Too frequent and you will lose
## data points due to timeouts during rebalancing and recovery ## data points due to timeouts during rebalancing and recovery

View File

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
@ -28,17 +27,19 @@ const (
) )
type Ceph struct { type Ceph struct {
CephBinary string CephBinary string `toml:"ceph_binary"`
OsdPrefix string OsdPrefix string `toml:"osd_prefix"`
MonPrefix string MonPrefix string `toml:"mon_prefix"`
MdsPrefix string MdsPrefix string `toml:"mds_prefix"`
RgwPrefix string RgwPrefix string `toml:"rgw_prefix"`
SocketDir string SocketDir string `toml:"socket_dir"`
SocketSuffix string SocketSuffix string `toml:"socket_suffix"`
CephUser string CephUser string `toml:"ceph_user"`
CephConfig string CephConfig string `toml:"ceph_config"`
GatherAdminSocketStats bool GatherAdminSocketStats bool `toml:"gather_admin_socket_stats"`
GatherClusterStats bool GatherClusterStats bool `toml:"gather_cluster_stats"`
Log telegraf.Logger `toml:"-"`
} }
func (c *Ceph) Description() string { func (c *Ceph) Description() string {
@ -67,7 +68,14 @@ var sampleConfig = `
## suffix used to identify socket files ## suffix used to identify socket files
socket_suffix = "asok" socket_suffix = "asok"
## Ceph user to authenticate as ## Ceph user to authenticate as, ceph will search for the corresponding keyring
## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the
## client section of ceph.conf for example:
##
## [client.telegraf]
## keyring = /etc/ceph/client.telegraf.keyring
##
## Consult the ceph documentation for more detail on keyring generation.
ceph_user = "client.admin" ceph_user = "client.admin"
## Ceph configuration to use to locate the cluster ## Ceph configuration to use to locate the cluster
@ -76,7 +84,8 @@ var sampleConfig = `
## Whether to gather statistics via the admin socket ## Whether to gather statistics via the admin socket
gather_admin_socket_stats = true gather_admin_socket_stats = true
## Whether to gather statistics via ceph commands ## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config
## to be specified
gather_cluster_stats = false gather_cluster_stats = false
` `
@ -112,14 +121,14 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
acc.AddError(fmt.Errorf("error reading from socket '%s': %v", s.socket, err)) acc.AddError(fmt.Errorf("error reading from socket '%s': %v", s.socket, err))
continue continue
} }
data, err := parseDump(dump) data, err := c.parseDump(dump)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("error parsing dump from socket '%s': %v", s.socket, err)) acc.AddError(fmt.Errorf("error parsing dump from socket '%s': %v", s.socket, err))
continue continue
} }
for tag, metrics := range data { for tag, metrics := range data {
acc.AddFields(measurement, acc.AddFields(measurement,
map[string]interface{}(metrics), metrics,
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag}) map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
} }
} }
@ -138,7 +147,7 @@ func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error {
// For each job, execute against the cluster, parse and accumulate the data points // For each job, execute against the cluster, parse and accumulate the data points
for _, job := range jobs { for _, job := range jobs {
output, err := c.exec(job.command) output, err := c.execute(job.command)
if err != nil { if err != nil {
return fmt.Errorf("error executing command: %v", err) return fmt.Errorf("error executing command: %v", err)
} }
@ -171,15 +180,17 @@ func init() {
var perfDump = func(binary string, socket *socket) (string, error) { var perfDump = func(binary string, socket *socket) (string, error) {
cmdArgs := []string{"--admin-daemon", socket.socket} cmdArgs := []string{"--admin-daemon", socket.socket}
if socket.sockType == typeOsd {
switch socket.sockType {
case typeOsd:
cmdArgs = append(cmdArgs, "perf", "dump") cmdArgs = append(cmdArgs, "perf", "dump")
} else if socket.sockType == typeMon { case typeMon:
cmdArgs = append(cmdArgs, "perfcounters_dump") cmdArgs = append(cmdArgs, "perfcounters_dump")
} else if socket.sockType == typeMds { case typeMds:
cmdArgs = append(cmdArgs, "perf", "dump") cmdArgs = append(cmdArgs, "perf", "dump")
} else if socket.sockType == typeRgw { case typeRgw:
cmdArgs = append(cmdArgs, "perf", "dump") cmdArgs = append(cmdArgs, "perf", "dump")
} else { default:
return "", fmt.Errorf("ignoring unknown socket type: %s", socket.sockType) return "", fmt.Errorf("ignoring unknown socket type: %s", socket.sockType)
} }
@ -268,23 +279,23 @@ type taggedMetricMap map[string]metricMap
// Parses a raw JSON string into a taggedMetricMap // Parses a raw JSON string into a taggedMetricMap
// Delegates the actual parsing to newTaggedMetricMap(..) // Delegates the actual parsing to newTaggedMetricMap(..)
func parseDump(dump string) (taggedMetricMap, error) { func (c *Ceph) parseDump(dump string) (taggedMetricMap, error) {
data := make(map[string]interface{}) data := make(map[string]interface{})
err := json.Unmarshal([]byte(dump), &data) err := json.Unmarshal([]byte(dump), &data)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse json: '%s': %v", dump, err) return nil, fmt.Errorf("failed to parse json: '%s': %v", dump, err)
} }
return newTaggedMetricMap(data), nil return c.newTaggedMetricMap(data), nil
} }
// Builds a TaggedMetricMap out of a generic string map. // Builds a TaggedMetricMap out of a generic string map.
// The top-level key is used as a tag and all sub-keys are flattened into metrics // The top-level key is used as a tag and all sub-keys are flattened into metrics
func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap { func (c *Ceph) newTaggedMetricMap(data map[string]interface{}) taggedMetricMap {
tmm := make(taggedMetricMap) tmm := make(taggedMetricMap)
for tag, datapoints := range data { for tag, datapoints := range data {
mm := make(metricMap) mm := make(metricMap)
for _, m := range flatten(datapoints) { for _, m := range c.flatten(datapoints) {
mm[m.name()] = m.value mm[m.name()] = m.value
} }
tmm[tag] = mm tmm[tag] = mm
@ -296,7 +307,7 @@ func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap {
// Nested keys are flattened into ordered slices associated with a metric value. // Nested keys are flattened into ordered slices associated with a metric value.
// The key slices are treated as stacks, and are expected to be reversed and concatenated // The key slices are treated as stacks, and are expected to be reversed and concatenated
// when passed as metrics to the accumulator. (see (*metric).name()) // when passed as metrics to the accumulator. (see (*metric).name())
func flatten(data interface{}) []*metric { func (c *Ceph) flatten(data interface{}) []*metric {
var metrics []*metric var metrics []*metric
switch val := data.(type) { switch val := data.(type) {
@ -305,20 +316,20 @@ func flatten(data interface{}) []*metric {
case map[string]interface{}: case map[string]interface{}:
metrics = make([]*metric, 0, len(val)) metrics = make([]*metric, 0, len(val))
for k, v := range val { for k, v := range val {
for _, m := range flatten(v) { for _, m := range c.flatten(v) {
m.pathStack = append(m.pathStack, k) m.pathStack = append(m.pathStack, k)
metrics = append(metrics, m) metrics = append(metrics, m)
} }
} }
default: default:
log.Printf("I! [inputs.ceph] ignoring unexpected type '%T' for value %v", val, val) c.Log.Infof("ignoring unexpected type '%T' for value %v", val, val)
} }
return metrics return metrics
} }
// exec executes the 'ceph' command with the supplied arguments, returning JSON formatted output // execute executes the 'ceph' command with the supplied arguments, returning JSON formatted output
func (c *Ceph) exec(command string) (string, error) { func (c *Ceph) execute(command string) (string, error) {
cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"} cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"}
cmdArgs = append(cmdArgs, strings.Split(command, " ")...) cmdArgs = append(cmdArgs, strings.Split(command, " ")...)

View File

@ -9,8 +9,9 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
const ( const (
@ -29,28 +30,32 @@ func TestParseSockId(t *testing.T) {
} }
func TestParseMonDump(t *testing.T) { func TestParseMonDump(t *testing.T) {
dump, err := parseDump(monPerfDump) c := &Ceph{Log: testutil.Logger{}}
dump, err := c.parseDump(monPerfDump)
require.NoError(t, err) require.NoError(t, err)
require.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon) require.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon)
require.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon) require.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon)
} }
func TestParseOsdDump(t *testing.T) { func TestParseOsdDump(t *testing.T) {
dump, err := parseDump(osdPerfDump) c := &Ceph{Log: testutil.Logger{}}
dump, err := c.parseDump(osdPerfDump)
require.NoError(t, err) require.NoError(t, err)
require.InEpsilon(t, 552132.109360000, dump["filestore"]["commitcycle_interval.sum"], epsilon) require.InEpsilon(t, 552132.109360000, dump["filestore"]["commitcycle_interval.sum"], epsilon)
require.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"]) require.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"])
} }
func TestParseMdsDump(t *testing.T) { func TestParseMdsDump(t *testing.T) {
dump, err := parseDump(mdsPerfDump) c := &Ceph{Log: testutil.Logger{}}
dump, err := c.parseDump(mdsPerfDump)
require.NoError(t, err) require.NoError(t, err)
require.InEpsilon(t, 2408386.600934982, dump["mds"]["reply_latency.sum"], epsilon) require.InEpsilon(t, 2408386.600934982, dump["mds"]["reply_latency.sum"], epsilon)
require.Equal(t, float64(0), dump["throttle-write_buf_throttle"]["wait.avgcount"]) require.Equal(t, float64(0), dump["throttle-write_buf_throttle"]["wait.avgcount"])
} }
func TestParseRgwDump(t *testing.T) { func TestParseRgwDump(t *testing.T) {
dump, err := parseDump(rgwPerfDump) c := &Ceph{Log: testutil.Logger{}}
dump, err := c.parseDump(rgwPerfDump)
require.NoError(t, err) require.NoError(t, err)
require.InEpsilon(t, 0.002219876, dump["rgw"]["get_initial_lat.sum"], epsilon) require.InEpsilon(t, 0.002219876, dump["rgw"]["get_initial_lat.sum"], epsilon)
require.Equal(t, float64(0), dump["rgw"]["put_initial_lat.avgcount"]) require.Equal(t, float64(0), dump["rgw"]["put_initial_lat.avgcount"])

View File

@ -44,12 +44,19 @@ All measurements have the following tags:
### Configuration: ### Configuration:
```toml ```toml
# Read specific statistics per cgroup
# [[inputs.cgroup]] # [[inputs.cgroup]]
## Directories in which to look for files, globs are supported.
## Consider restricting paths to the set of cgroups you really
## want to monitor if you have a large number of cgroups, to avoid
## any cardinality issues.
# paths = [ # paths = [
# "/sys/fs/cgroup/memory", # root cgroup # "/sys/fs/cgroup/memory",
# "/sys/fs/cgroup/memory/child1", # container cgroup # "/sys/fs/cgroup/memory/child1",
# "/sys/fs/cgroup/memory/child2/*", # all children cgroups under child2, but not child2 itself # "/sys/fs/cgroup/memory/child2/*",
# ] # ]
## cgroup stat fields, as file names, globs are supported.
## these file names are appended to each path from above.
# files = ["memory.*usage*", "memory.limit_in_bytes"] # files = ["memory.*usage*", "memory.limit_in_bytes"]
``` ```

View File

@ -25,7 +25,7 @@ func (g *CGroup) Gather(acc telegraf.Accumulator) error {
acc.AddError(dir.err) acc.AddError(dir.err)
continue continue
} }
if err := g.gatherDir(dir.path, acc); err != nil { if err := g.gatherDir(acc, dir.path); err != nil {
acc.AddError(err) acc.AddError(err)
} }
} }
@ -33,7 +33,7 @@ func (g *CGroup) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error { func (g *CGroup) gatherDir(acc telegraf.Accumulator, dir string) error {
fields := make(map[string]interface{}) fields := make(map[string]interface{})
list := make(chan pathInfo) list := make(chan pathInfo)
@ -72,8 +72,8 @@ type pathInfo struct {
err error err error
} }
func isDir(path string) (bool, error) { func isDir(pathToCheck string) (bool, error) {
result, err := os.Stat(path) result, err := os.Stat(pathToCheck)
if err != nil { if err != nil {
return false, err return false, err
} }

View File

@ -51,7 +51,7 @@ func TestGather(t *testing.T) {
acc.AssertContainsTaggedFields(t, "chrony", fields, tags) acc.AssertContainsTaggedFields(t, "chrony", fields, tags)
} }
// fackeExecCommand is a helper function that mock // fakeExecCommand is a helper function that mock
// the exec.Command call (and call the test binary) // the exec.Command call (and call the test binary)
func fakeExecCommand(command string, args ...string) *exec.Cmd { func fakeExecCommand(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcess", "--", command} cs := []string{"-test.run=TestHelperProcess", "--", command}
@ -103,7 +103,9 @@ Leap status : Not synchronized
} else { } else {
//nolint:errcheck,revive // test will fail anyway //nolint:errcheck,revive // test will fail anyway
fmt.Fprint(os.Stdout, "command not found") fmt.Fprint(os.Stdout, "command not found")
//nolint:revive // error code is important for this "test"
os.Exit(1) os.Exit(1)
} }
//nolint:revive // error code is important for this "test"
os.Exit(0) os.Exit(0)
} }

View File

@ -15,15 +15,16 @@ import (
dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout"
telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto" //nolint:staticcheck // Cannot switch to "google.golang.org/protobuf/proto", "github.com/golang/protobuf/proto" is used by "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
_ "google.golang.org/grpc/encoding/gzip" // Register GRPC gzip decoder to support compressed telemetry _ "google.golang.org/grpc/encoding/gzip" // Register GRPC gzip decoder to support compressed telemetry
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
) )
const ( const (
@ -51,15 +52,15 @@ type CiscoTelemetryMDT struct {
listener net.Listener listener net.Listener
// Internal state // Internal state
aliases map[string]string internalAliases map[string]string
dmesFuncs map[string]string dmesFuncs map[string]string
warned map[string]struct{} warned map[string]struct{}
extraTags map[string]map[string]struct{} extraTags map[string]map[string]struct{}
nxpathMap map[string]map[string]string //per path map nxpathMap map[string]map[string]string //per path map
propMap map[string]func(field *telemetry.TelemetryField, value interface{}) interface{} propMap map[string]func(field *telemetry.TelemetryField, value interface{}) interface{}
mutex sync.Mutex mutex sync.Mutex
acc telegraf.Accumulator acc telegraf.Accumulator
wg sync.WaitGroup wg sync.WaitGroup
} }
type NxPayloadXfromStructure struct { type NxPayloadXfromStructure struct {
@ -87,9 +88,9 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
// Invert aliases list // Invert aliases list
c.warned = make(map[string]struct{}) c.warned = make(map[string]struct{})
c.aliases = make(map[string]string, len(c.Aliases)) c.internalAliases = make(map[string]string, len(c.Aliases))
for alias, encodingPath := range c.Aliases { for alias, encodingPath := range c.Aliases {
c.aliases[encodingPath] = alias c.internalAliases[encodingPath] = alias
} }
c.initDb() c.initDb()
@ -276,9 +277,9 @@ func (c *CiscoTelemetryMDT) handleTCPClient(conn net.Conn) error {
// MdtDialout RPC server method for grpc-dialout transport // MdtDialout RPC server method for grpc-dialout transport
func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutServer) error { func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutServer) error {
peer, peerOK := peer.FromContext(stream.Context()) peerInCtx, peerOK := peer.FromContext(stream.Context())
if peerOK { if peerOK {
c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peer.Addr) c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr)
} }
var chunkBuffer bytes.Buffer var chunkBuffer bytes.Buffer
@ -314,7 +315,7 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS
} }
if peerOK { if peerOK {
c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peer.Addr) c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr)
} }
return nil return nil
@ -375,8 +376,8 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
} }
} }
for _, metric := range grouper.Metrics() { for _, groupedMetric := range grouper.Metrics() {
c.acc.AddMetric(metric) c.acc.AddMetric(groupedMetric)
} }
} }
@ -540,7 +541,7 @@ func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, fie
if value := decodeValue(field); value != nil { if value := decodeValue(field); value != nil {
// Do alias lookup, to shorten measurement names // Do alias lookup, to shorten measurement names
measurement := encodingPath measurement := encodingPath
if alias, ok := c.aliases[encodingPath]; ok { if alias, ok := c.internalAliases[encodingPath]; ok {
measurement = alias measurement = alias
} else { } else {
c.mutex.Lock() c.mutex.Lock()

View File

@ -9,11 +9,12 @@ import (
"testing" "testing"
dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout"
telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" telemetryBis "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto" //nolint:staticcheck // Cannot switch to "google.golang.org/protobuf/proto", "github.com/golang/protobuf/proto" is used by "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/influxdata/telegraf/testutil"
) )
func TestHandleTelemetryTwoSimple(t *testing.T) { func TestHandleTelemetryTwoSimple(t *testing.T) {
@ -23,55 +24,55 @@ func TestHandleTelemetryTwoSimple(t *testing.T) {
// error is expected since we are passing in dummy transport // error is expected since we are passing in dummy transport
require.Error(t, err) require.Error(t, err)
telemetry := &telemetry.Telemetry{ telemetry := &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "type:model/some/path", EncodingPath: "type:model/some/path",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "name", Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "str"},
}, },
{ {
Name: "uint64", Name: "uint64",
ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 1234}, ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 1234},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "bool", Name: "bool",
ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: true}, ValueByType: &telemetryBis.TelemetryField_BoolValue{BoolValue: true},
}, },
}, },
}, },
}, },
}, },
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "name", Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str2"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "str2"},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "bool", Name: "bool",
ValueByType: &telemetry.TelemetryField_BoolValue{BoolValue: false}, ValueByType: &telemetryBis.TelemetryField_BoolValue{BoolValue: false},
}, },
}, },
}, },
@ -101,26 +102,26 @@ func TestHandleTelemetrySingleNested(t *testing.T) {
// error is expected since we are passing in dummy transport // error is expected since we are passing in dummy transport
require.Error(t, err) require.Error(t, err)
telemetry := &telemetry.Telemetry{ telemetry := &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "type:model/nested/path", EncodingPath: "type:model/nested/path",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "nested", Name: "nested",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "key", Name: "key",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "level", Name: "level",
ValueByType: &telemetry.TelemetryField_DoubleValue{DoubleValue: 3}, ValueByType: &telemetryBis.TelemetryField_DoubleValue{DoubleValue: 3},
}, },
}, },
}, },
@ -130,16 +131,16 @@ func TestHandleTelemetrySingleNested(t *testing.T) {
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "nested", Name: "nested",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "value", Name: "value",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "foo", Name: "foo",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
@ -169,49 +170,49 @@ func TestHandleEmbeddedTags(t *testing.T) {
// error is expected since we are passing in dummy transport // error is expected since we are passing in dummy transport
require.Error(t, err) require.Error(t, err)
telemetry := &telemetry.Telemetry{ telemetry := &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "type:model/extra", EncodingPath: "type:model/extra",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "foo", Name: "foo",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "list", Name: "list",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "name", Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry1"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "entry1"},
}, },
{ {
Name: "test", Name: "test",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"},
}, },
}, },
}, },
{ {
Name: "list", Name: "list",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "name", Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry2"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "entry2"},
}, },
{ {
Name: "test", Name: "test",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
@ -242,57 +243,57 @@ func TestHandleNXAPI(t *testing.T) {
// error is expected since we are passing in dummy transport // error is expected since we are passing in dummy transport
require.Error(t, err) require.Error(t, err)
telemetry := &telemetry.Telemetry{ telemetry := &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "show nxapi", EncodingPath: "show nxapi",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "foo", Name: "foo",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "TABLE_nxapi", Name: "TABLE_nxapi",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "ROW_nxapi", Name: "ROW_nxapi",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "index", Name: "index",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i1"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "i1"},
}, },
{ {
Name: "value", Name: "value",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"},
}, },
}, },
}, },
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "index", Name: "index",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i2"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "i2"},
}, },
{ {
Name: "value", Name: "value",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
@ -331,45 +332,45 @@ func TestHandleNXAPIXformNXAPI(t *testing.T) {
// error is expected since we are passing in dummy transport // error is expected since we are passing in dummy transport
require.Error(t, err) require.Error(t, err)
telemetry := &telemetry.Telemetry{ telemetry := &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "show processes cpu", EncodingPath: "show processes cpu",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "foo", Name: "foo",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "TABLE_process_cpu", Name: "TABLE_process_cpu",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "ROW_process_cpu", Name: "ROW_process_cpu",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "index", Name: "index",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i1"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "i1"},
}, },
{ {
Name: "value", Name: "value",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"},
}, },
}, },
}, },
@ -405,57 +406,57 @@ func TestHandleNXXformMulti(t *testing.T) {
// error is expected since we are passing in dummy transport // error is expected since we are passing in dummy transport
require.Error(t, err) require.Error(t, err)
telemetry := &telemetry.Telemetry{ telemetry := &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "sys/lldp", EncodingPath: "sys/lldp",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "foo", Name: "foo",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "fooEntity", Name: "fooEntity",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "attributes", Name: "attributes",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "rn", Name: "rn",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "some-rn"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "some-rn"},
}, },
{ {
Name: "portIdV", Name: "portIdV",
ValueByType: &telemetry.TelemetryField_Uint32Value{Uint32Value: 12}, ValueByType: &telemetryBis.TelemetryField_Uint32Value{Uint32Value: 12},
}, },
{ {
Name: "portDesc", Name: "portDesc",
ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 100}, ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 100},
}, },
{ {
Name: "test", Name: "test",
ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 281474976710655}, ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 281474976710655},
}, },
{ {
Name: "subscriptionId", Name: "subscriptionId",
ValueByType: &telemetry.TelemetryField_Uint64Value{Uint64Value: 2814749767106551}, ValueByType: &telemetryBis.TelemetryField_Uint64Value{Uint64Value: 2814749767106551},
}, },
}, },
}, },
@ -490,45 +491,45 @@ func TestHandleNXDME(t *testing.T) {
// error is expected since we are passing in dummy transport // error is expected since we are passing in dummy transport
require.Error(t, err) require.Error(t, err)
telemetry := &telemetry.Telemetry{ telemetry := &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "sys/dme", EncodingPath: "sys/dme",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "foo", Name: "foo",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "bar"},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "fooEntity", Name: "fooEntity",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "attributes", Name: "attributes",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "rn", Name: "rn",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "some-rn"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "some-rn"},
}, },
{ {
Name: "value", Name: "value",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "foo"},
}, },
}, },
}, },
@ -584,30 +585,30 @@ func TestTCPDialoutOverflow(t *testing.T) {
require.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000")) require.Contains(t, acc.Errors, errors.New("dialout packet too long: 1000000000"))
} }
func mockTelemetryMessage() *telemetry.Telemetry { func mockTelemetryMessage() *telemetryBis.Telemetry {
return &telemetry.Telemetry{ return &telemetryBis.Telemetry{
MsgTimestamp: 1543236572000, MsgTimestamp: 1543236572000,
EncodingPath: "type:model/some/path", EncodingPath: "type:model/some/path",
NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, NodeId: &telemetryBis.Telemetry_NodeIdStr{NodeIdStr: "hostname"},
Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, Subscription: &telemetryBis.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"},
DataGpbkv: []*telemetry.TelemetryField{ DataGpbkv: []*telemetryBis.TelemetryField{
{ {
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "keys", Name: "keys",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "name", Name: "name",
ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "str"}, ValueByType: &telemetryBis.TelemetryField_StringValue{StringValue: "str"},
}, },
}, },
}, },
{ {
Name: "content", Name: "content",
Fields: []*telemetry.TelemetryField{ Fields: []*telemetryBis.TelemetryField{
{ {
Name: "value", Name: "value",
ValueByType: &telemetry.TelemetryField_Sint64Value{Sint64Value: -1}, ValueByType: &telemetryBis.TelemetryField_Sint64Value{Sint64Value: -1},
}, },
}, },
}, },

View File

@ -139,6 +139,7 @@ func (c *CiscoTelemetryMDT) nxosValueXform(field *telemetry.TelemetryField, valu
return nil return nil
//Xformation supported is only from String //Xformation supported is only from String
case "float": case "float":
//nolint:revive // switch needed for `.(type)`
switch val := field.ValueByType.(type) { switch val := field.ValueByType.(type) {
case *telemetry.TelemetryField_StringValue: case *telemetry.TelemetryField_StringValue:
if valf, err := strconv.ParseFloat(val.StringValue, 64); err == nil { if valf, err := strconv.ParseFloat(val.StringValue, 64); err == nil {

View File

@ -24,7 +24,7 @@ var defaultTimeout = 5 * time.Second
var sampleConfig = ` var sampleConfig = `
## Username for authorization on ClickHouse server ## Username for authorization on ClickHouse server
## example: username = "default"" ## example: username = "default"
username = "default" username = "default"
## Password for authorization on ClickHouse server ## Password for authorization on ClickHouse server
@ -560,11 +560,11 @@ func (e *clickhouseError) Error() string {
return fmt.Sprintf("received error code %d: %s", e.StatusCode, e.body) return fmt.Sprintf("received error code %d: %s", e.StatusCode, e.body)
} }
func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error { func (ch *ClickHouse) execQuery(address *url.URL, query string, i interface{}) error {
q := url.Query() q := address.Query()
q.Set("query", query+" FORMAT JSON") q.Set("query", query+" FORMAT JSON")
url.RawQuery = q.Encode() address.RawQuery = q.Encode()
req, _ := http.NewRequest("GET", url.String(), nil) req, _ := http.NewRequest("GET", address.String(), nil)
if ch.Username != "" { if ch.Username != "" {
req.Header.Add("X-ClickHouse-User", ch.Username) req.Header.Add("X-ClickHouse-User", ch.Username)
} }

View File

@ -8,28 +8,28 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
) )
func TestClusterIncludeExcludeFilter(t *testing.T) { func TestClusterIncludeExcludeFilter(t *testing.T) {
ch := ClickHouse{} ch := ClickHouse{}
if assert.Equal(t, "", ch.clusterIncludeExcludeFilter()) { require.Equal(t, "", ch.clusterIncludeExcludeFilter())
ch.ClusterExclude = []string{"test_cluster"} ch.ClusterExclude = []string{"test_cluster"}
assert.Equal(t, "WHERE cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) require.Equal(t, "WHERE cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter())
ch.ClusterExclude = []string{"test_cluster"} ch.ClusterExclude = []string{"test_cluster"}
ch.ClusterInclude = []string{"cluster"} ch.ClusterInclude = []string{"cluster"}
assert.Equal(t, "WHERE cluster IN ('cluster') OR cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) require.Equal(t, "WHERE cluster IN ('cluster') OR cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter())
ch.ClusterExclude = []string{} ch.ClusterExclude = []string{}
ch.ClusterInclude = []string{"cluster1", "cluster2"} ch.ClusterInclude = []string{"cluster1", "cluster2"}
assert.Equal(t, "WHERE cluster IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) require.Equal(t, "WHERE cluster IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter())
ch.ClusterExclude = []string{"cluster1", "cluster2"} ch.ClusterExclude = []string{"cluster1", "cluster2"}
ch.ClusterInclude = []string{} ch.ClusterInclude = []string{}
assert.Equal(t, "WHERE cluster NOT IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) require.Equal(t, "WHERE cluster NOT IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter())
}
} }
func TestChInt64(t *testing.T) { func TestChInt64(t *testing.T) {
@ -42,9 +42,9 @@ func TestChInt64(t *testing.T) {
} }
for src, expected := range assets { for src, expected := range assets {
var v chUInt64 var v chUInt64
if err := v.UnmarshalJSON([]byte(src)); assert.NoError(t, err) { err := v.UnmarshalJSON([]byte(src))
assert.Equal(t, expected, uint64(v)) require.NoError(t, err)
} require.Equal(t, expected, uint64(v))
} }
} }
@ -74,7 +74,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.events"): case strings.Contains(query, "system.events"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -91,7 +91,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.metrics"): case strings.Contains(query, "system.metrics"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -108,7 +108,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.asynchronous_metrics"): case strings.Contains(query, "system.asynchronous_metrics"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -125,7 +125,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "zk_exists"): case strings.Contains(query, "zk_exists"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -136,7 +136,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "zk_root_nodes"): case strings.Contains(query, "zk_root_nodes"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -147,7 +147,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "replication_queue_exists"): case strings.Contains(query, "replication_queue_exists"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -158,7 +158,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "replication_too_many_tries_replicas"): case strings.Contains(query, "replication_too_many_tries_replicas"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -171,7 +171,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.detached_parts"): case strings.Contains(query, "system.detached_parts"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -182,7 +182,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.dictionaries"): case strings.Contains(query, "system.dictionaries"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -197,7 +197,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.mutations"): case strings.Contains(query, "system.mutations"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -212,7 +212,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.disks"): case strings.Contains(query, "system.disks"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -229,7 +229,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.processes"): case strings.Contains(query, "system.processes"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -258,7 +258,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "text_log_exists"): case strings.Contains(query, "text_log_exists"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -269,7 +269,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "system.text_log"): case strings.Contains(query, "system.text_log"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -298,7 +298,7 @@ func TestGather(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
} }
})) }))
ch = &ClickHouse{ ch = &ClickHouse{
@ -309,7 +309,7 @@ func TestGather(t *testing.T) {
acc = &testutil.Accumulator{} acc = &testutil.Accumulator{}
) )
defer ts.Close() defer ts.Close()
assert.NoError(t, ch.Gather(acc)) require.NoError(t, ch.Gather(acc))
acc.AssertContainsTaggedFields(t, "clickhouse_tables", acc.AssertContainsTaggedFields(t, "clickhouse_tables",
map[string]interface{}{ map[string]interface{}{
@ -451,7 +451,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "replication_queue_exists"): case strings.Contains(query, "replication_queue_exists"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -462,7 +462,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
case strings.Contains(query, "text_log_exists"): case strings.Contains(query, "text_log_exists"):
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
@ -473,7 +473,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
} }
})) }))
ch = &ClickHouse{ ch = &ClickHouse{
@ -485,7 +485,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
acc = &testutil.Accumulator{} acc = &testutil.Accumulator{}
) )
defer ts.Close() defer ts.Close()
assert.NoError(t, ch.Gather(acc)) require.NoError(t, ch.Gather(acc))
acc.AssertDoesNotContainMeasurement(t, "clickhouse_zookeeper") acc.AssertDoesNotContainMeasurement(t, "clickhouse_zookeeper")
acc.AssertDoesNotContainMeasurement(t, "clickhouse_replication_queue") acc.AssertDoesNotContainMeasurement(t, "clickhouse_replication_queue")
@ -503,7 +503,7 @@ func TestWrongJSONMarshalling(t *testing.T) {
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct{}{}, Data: []struct{}{},
}) })
assert.NoError(t, err) require.NoError(t, err)
})) }))
ch = &ClickHouse{ ch = &ClickHouse{
Servers: []string{ Servers: []string{
@ -514,9 +514,9 @@ func TestWrongJSONMarshalling(t *testing.T) {
acc = &testutil.Accumulator{} acc = &testutil.Accumulator{}
) )
defer ts.Close() defer ts.Close()
assert.NoError(t, ch.Gather(acc)) require.NoError(t, ch.Gather(acc))
assert.Equal(t, 0, len(acc.Metrics)) require.Equal(t, 0, len(acc.Metrics))
allMeasurements := []string{ allMeasurements := []string{
"clickhouse_events", "clickhouse_events",
"clickhouse_metrics", "clickhouse_metrics",
@ -531,7 +531,7 @@ func TestWrongJSONMarshalling(t *testing.T) {
"clickhouse_processes", "clickhouse_processes",
"clickhouse_text_log", "clickhouse_text_log",
} }
assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) require.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors))
} }
func TestOfflineServer(t *testing.T) { func TestOfflineServer(t *testing.T) {
@ -547,9 +547,9 @@ func TestOfflineServer(t *testing.T) {
}, },
} }
) )
assert.NoError(t, ch.Gather(acc)) require.NoError(t, ch.Gather(acc))
assert.Equal(t, 0, len(acc.Metrics)) require.Equal(t, 0, len(acc.Metrics))
allMeasurements := []string{ allMeasurements := []string{
"clickhouse_events", "clickhouse_events",
"clickhouse_metrics", "clickhouse_metrics",
@ -564,7 +564,7 @@ func TestOfflineServer(t *testing.T) {
"clickhouse_processes", "clickhouse_processes",
"clickhouse_text_log", "clickhouse_text_log",
} }
assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) require.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors))
} }
func TestAutoDiscovery(t *testing.T) { func TestAutoDiscovery(t *testing.T) {
@ -574,8 +574,8 @@ func TestAutoDiscovery(t *testing.T) {
Data interface{} `json:"data"` Data interface{} `json:"data"`
} }
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
switch query := r.URL.Query().Get("query"); { query := r.URL.Query().Get("query")
case strings.Contains(query, "system.clusters"): if strings.Contains(query, "system.clusters") {
err := enc.Encode(result{ err := enc.Encode(result{
Data: []struct { Data: []struct {
Cluster string `json:"test"` Cluster string `json:"test"`
@ -589,7 +589,7 @@ func TestAutoDiscovery(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err) require.NoError(t, err)
} }
})) }))
ch = &ClickHouse{ ch = &ClickHouse{
@ -602,5 +602,5 @@ func TestAutoDiscovery(t *testing.T) {
acc = &testutil.Accumulator{} acc = &testutil.Accumulator{}
) )
defer ts.Close() defer ts.Close()
assert.NoError(t, ch.Gather(acc)) require.NoError(t, ch.Gather(acc))
} }

View File

@ -16,6 +16,7 @@ API endpoint. In the following order the plugin will attempt to authenticate.
### Configuration: ### Configuration:
```toml ```toml
# Pull Metric Statistics from Amazon CloudWatch
[[inputs.cloudwatch]] [[inputs.cloudwatch]]
## Amazon Region ## Amazon Region
region = "us-east-1" region = "us-east-1"
@ -101,7 +102,7 @@ API endpoint. In the following order the plugin will attempt to authenticate.
# #
# ## Dimension filters for Metric. All dimensions defined for the metric names # ## Dimension filters for Metric. All dimensions defined for the metric names
# ## must be specified in order to retrieve the metric statistics. # ## must be specified in order to retrieve the metric statistics.
# ## 'value' has wildcard / 'glob' matching support such as `p-*`. # ## 'value' has wildcard / 'glob' matching support such as 'p-*'.
# [[inputs.cloudwatch.metrics.dimensions]] # [[inputs.cloudwatch.metrics.dimensions]]
# name = "LoadBalancerName" # name = "LoadBalancerName"
# value = "p-example" # value = "p-example"

View File

@ -10,15 +10,16 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" cwClient "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
internalaws "github.com/influxdata/telegraf/config/aws" internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/metric" internalMetric "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/proxy" internalProxy "github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -36,7 +37,7 @@ type CloudWatch struct {
StatisticInclude []string `toml:"statistic_include"` StatisticInclude []string `toml:"statistic_include"`
Timeout config.Duration `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
proxy.HTTPProxy internalProxy.HTTPProxy
Period config.Duration `toml:"period"` Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"` Delay config.Duration `toml:"delay"`
@ -76,12 +77,12 @@ type metricCache struct {
ttl time.Duration ttl time.Duration
built time.Time built time.Time
metrics []filteredMetric metrics []filteredMetric
queries []*cloudwatch.MetricDataQuery queries []*cwClient.MetricDataQuery
} }
type cloudwatchClient interface { type cloudwatchClient interface {
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) ListMetrics(*cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error)
GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) GetMetricData(*cwClient.GetMetricDataInput) (*cwClient.GetMetricDataOutput, error)
} }
// SampleConfig returns the default configuration of the Cloudwatch input plugin. // SampleConfig returns the default configuration of the Cloudwatch input plugin.
@ -171,7 +172,7 @@ func (c *CloudWatch) SampleConfig() string {
# #
# ## Dimension filters for Metric. All dimensions defined for the metric names # ## Dimension filters for Metric. All dimensions defined for the metric names
# ## must be specified in order to retrieve the metric statistics. # ## must be specified in order to retrieve the metric statistics.
# ## 'value' has wildcard / 'glob' matching support. such as 'p-*'. # ## 'value' has wildcard / 'glob' matching support such as 'p-*'.
# [[inputs.cloudwatch.metrics.dimensions]] # [[inputs.cloudwatch.metrics.dimensions]]
# name = "LoadBalancerName" # name = "LoadBalancerName"
# value = "p-example" # value = "p-example"
@ -223,11 +224,11 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
rLock := sync.Mutex{} rLock := sync.Mutex{}
results := []*cloudwatch.MetricDataResult{} results := []*cwClient.MetricDataResult{}
// 500 is the maximum number of metric data queries a `GetMetricData` request can contain. // 500 is the maximum number of metric data queries a `GetMetricData` request can contain.
batchSize := 500 batchSize := 500
var batches [][]*cloudwatch.MetricDataQuery var batches [][]*cwClient.MetricDataQuery
for batchSize < len(queries) { for batchSize < len(queries) {
queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize]) queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize])
@ -237,7 +238,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
for i := range batches { for i := range batches {
wg.Add(1) wg.Add(1)
<-lmtr.C <-lmtr.C
go func(inm []*cloudwatch.MetricDataQuery) { go func(inm []*cwClient.MetricDataQuery) {
defer wg.Done() defer wg.Done()
result, err := c.gatherMetrics(c.getDataInputs(inm)) result, err := c.gatherMetrics(c.getDataInputs(inm))
if err != nil { if err != nil {
@ -294,7 +295,7 @@ func (c *CloudWatch) initializeCloudWatch() error {
} }
loglevel := aws.LogOff loglevel := aws.LogOff
c.client = cloudwatch.New(configProvider, cfg.WithLogLevel(loglevel)) c.client = cwClient.New(configProvider, cfg.WithLogLevel(loglevel))
// Initialize regex matchers for each Dimension value. // Initialize regex matchers for each Dimension value.
for _, m := range c.Metrics { for _, m := range c.Metrics {
@ -312,7 +313,7 @@ func (c *CloudWatch) initializeCloudWatch() error {
} }
type filteredMetric struct { type filteredMetric struct {
metrics []*cloudwatch.Metric metrics []*cwClient.Metric
statFilter filter.Filter statFilter filter.Filter
} }
@ -327,17 +328,17 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
// check for provided metric filter // check for provided metric filter
if c.Metrics != nil { if c.Metrics != nil {
for _, m := range c.Metrics { for _, m := range c.Metrics {
metrics := []*cloudwatch.Metric{} metrics := []*cwClient.Metric{}
if !hasWildcard(m.Dimensions) { if !hasWildcard(m.Dimensions) {
dimensions := make([]*cloudwatch.Dimension, len(m.Dimensions)) dimensions := make([]*cwClient.Dimension, len(m.Dimensions))
for k, d := range m.Dimensions { for k, d := range m.Dimensions {
dimensions[k] = &cloudwatch.Dimension{ dimensions[k] = &cwClient.Dimension{
Name: aws.String(d.Name), Name: aws.String(d.Name),
Value: aws.String(d.Value), Value: aws.String(d.Value),
} }
} }
for _, name := range m.MetricNames { for _, name := range m.MetricNames {
metrics = append(metrics, &cloudwatch.Metric{ metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(c.Namespace), Namespace: aws.String(c.Namespace),
MetricName: aws.String(name), MetricName: aws.String(name),
Dimensions: dimensions, Dimensions: dimensions,
@ -351,7 +352,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
for _, name := range m.MetricNames { for _, name := range m.MetricNames {
for _, metric := range allMetrics { for _, metric := range allMetrics {
if isSelected(name, metric, m.Dimensions) { if isSelected(name, metric, m.Dimensions) {
metrics = append(metrics, &cloudwatch.Metric{ metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(c.Namespace), Namespace: aws.String(c.Namespace),
MetricName: aws.String(name), MetricName: aws.String(name),
Dimensions: metric.Dimensions, Dimensions: metric.Dimensions,
@ -399,11 +400,11 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
} }
// fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace. // fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace.
func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) { func (c *CloudWatch) fetchNamespaceMetrics() ([]*cwClient.Metric, error) {
metrics := []*cloudwatch.Metric{} metrics := []*cwClient.Metric{}
var token *string var token *string
var params *cloudwatch.ListMetricsInput var params *cwClient.ListMetricsInput
var recentlyActive *string var recentlyActive *string
switch c.RecentlyActive { switch c.RecentlyActive {
@ -412,9 +413,9 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
default: default:
recentlyActive = nil recentlyActive = nil
} }
params = &cloudwatch.ListMetricsInput{ params = &cwClient.ListMetricsInput{
Namespace: aws.String(c.Namespace), Namespace: aws.String(c.Namespace),
Dimensions: []*cloudwatch.DimensionFilter{}, Dimensions: []*cwClient.DimensionFilter{},
NextToken: token, NextToken: token,
MetricName: nil, MetricName: nil,
RecentlyActive: recentlyActive, RecentlyActive: recentlyActive,
@ -451,75 +452,75 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) {
} }
// getDataQueries gets all of the possible queries so we can maximize the request payload. // getDataQueries gets all of the possible queries so we can maximize the request payload.
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cloudwatch.MetricDataQuery { func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClient.MetricDataQuery {
if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() { if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() {
return c.metricCache.queries return c.metricCache.queries
} }
c.queryDimensions = map[string]*map[string]string{} c.queryDimensions = map[string]*map[string]string{}
dataQueries := []*cloudwatch.MetricDataQuery{} dataQueries := []*cwClient.MetricDataQuery{}
for i, filtered := range filteredMetrics { for i, filtered := range filteredMetrics {
for j, metric := range filtered.metrics { for j, metric := range filtered.metrics {
id := strconv.Itoa(j) + "_" + strconv.Itoa(i) id := strconv.Itoa(j) + "_" + strconv.Itoa(i)
dimension := ctod(metric.Dimensions) dimension := ctod(metric.Dimensions)
if filtered.statFilter.Match("average") { if filtered.statFilter.Match("average") {
c.queryDimensions["average_"+id] = dimension c.queryDimensions["average_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
Id: aws.String("average_" + id), Id: aws.String("average_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_average")), Label: aws.String(snakeCase(*metric.MetricName + "_average")),
MetricStat: &cloudwatch.MetricStat{ MetricStat: &cwClient.MetricStat{
Metric: metric, Metric: metric,
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticAverage), Stat: aws.String(cwClient.StatisticAverage),
}, },
}) })
} }
if filtered.statFilter.Match("maximum") { if filtered.statFilter.Match("maximum") {
c.queryDimensions["maximum_"+id] = dimension c.queryDimensions["maximum_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
Id: aws.String("maximum_" + id), Id: aws.String("maximum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")), Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
MetricStat: &cloudwatch.MetricStat{ MetricStat: &cwClient.MetricStat{
Metric: metric, Metric: metric,
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticMaximum), Stat: aws.String(cwClient.StatisticMaximum),
}, },
}) })
} }
if filtered.statFilter.Match("minimum") { if filtered.statFilter.Match("minimum") {
c.queryDimensions["minimum_"+id] = dimension c.queryDimensions["minimum_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
Id: aws.String("minimum_" + id), Id: aws.String("minimum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")), Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
MetricStat: &cloudwatch.MetricStat{ MetricStat: &cwClient.MetricStat{
Metric: metric, Metric: metric,
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticMinimum), Stat: aws.String(cwClient.StatisticMinimum),
}, },
}) })
} }
if filtered.statFilter.Match("sum") { if filtered.statFilter.Match("sum") {
c.queryDimensions["sum_"+id] = dimension c.queryDimensions["sum_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
Id: aws.String("sum_" + id), Id: aws.String("sum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sum")), Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
MetricStat: &cloudwatch.MetricStat{ MetricStat: &cwClient.MetricStat{
Metric: metric, Metric: metric,
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticSum), Stat: aws.String(cwClient.StatisticSum),
}, },
}) })
} }
if filtered.statFilter.Match("sample_count") { if filtered.statFilter.Match("sample_count") {
c.queryDimensions["sample_count_"+id] = dimension c.queryDimensions["sample_count_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
Id: aws.String("sample_count_" + id), Id: aws.String("sample_count_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")), Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
MetricStat: &cloudwatch.MetricStat{ MetricStat: &cwClient.MetricStat{
Metric: metric, Metric: metric,
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())), Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
Stat: aws.String(cloudwatch.StatisticSampleCount), Stat: aws.String(cwClient.StatisticSampleCount),
}, },
}) })
} }
@ -546,9 +547,9 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cloudwa
// gatherMetrics gets metric data from Cloudwatch. // gatherMetrics gets metric data from Cloudwatch.
func (c *CloudWatch) gatherMetrics( func (c *CloudWatch) gatherMetrics(
params *cloudwatch.GetMetricDataInput, params *cwClient.GetMetricDataInput,
) ([]*cloudwatch.MetricDataResult, error) { ) ([]*cwClient.MetricDataResult, error) {
results := []*cloudwatch.MetricDataResult{} results := []*cwClient.MetricDataResult{}
for { for {
resp, err := c.client.GetMetricData(params) resp, err := c.client.GetMetricData(params)
@ -568,10 +569,10 @@ func (c *CloudWatch) gatherMetrics(
func (c *CloudWatch) aggregateMetrics( func (c *CloudWatch) aggregateMetrics(
acc telegraf.Accumulator, acc telegraf.Accumulator,
metricDataResults []*cloudwatch.MetricDataResult, metricDataResults []*cwClient.MetricDataResult,
) error { ) error {
var ( var (
grouper = metric.NewSeriesGrouper() grouper = internalMetric.NewSeriesGrouper()
namespace = sanitizeMeasurement(c.Namespace) namespace = sanitizeMeasurement(c.Namespace)
) )
@ -626,7 +627,7 @@ func snakeCase(s string) string {
} }
// ctod converts cloudwatch dimensions to regular dimensions. // ctod converts cloudwatch dimensions to regular dimensions.
func ctod(cDimensions []*cloudwatch.Dimension) *map[string]string { func ctod(cDimensions []*cwClient.Dimension) *map[string]string {
dimensions := map[string]string{} dimensions := map[string]string{}
for i := range cDimensions { for i := range cDimensions {
dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value
@ -634,8 +635,8 @@ func ctod(cDimensions []*cloudwatch.Dimension) *map[string]string {
return &dimensions return &dimensions
} }
func (c *CloudWatch) getDataInputs(dataQueries []*cloudwatch.MetricDataQuery) *cloudwatch.GetMetricDataInput { func (c *CloudWatch) getDataInputs(dataQueries []*cwClient.MetricDataQuery) *cwClient.GetMetricDataInput {
return &cloudwatch.GetMetricDataInput{ return &cwClient.GetMetricDataInput{
StartTime: aws.Time(c.windowStart), StartTime: aws.Time(c.windowStart),
EndTime: aws.Time(c.windowEnd), EndTime: aws.Time(c.windowEnd),
MetricDataQueries: dataQueries, MetricDataQueries: dataQueries,
@ -656,7 +657,7 @@ func hasWildcard(dimensions []*Dimension) bool {
return false return false
} }
func isSelected(name string, metric *cloudwatch.Metric, dimensions []*Dimension) bool { func isSelected(name string, metric *cwClient.Metric, dimensions []*Dimension) bool {
if name != *metric.MetricName { if name != *metric.MetricName {
return false return false
} }

View File

@ -6,8 +6,7 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" cwClient "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
@ -18,13 +17,13 @@ import (
type mockGatherCloudWatchClient struct{} type mockGatherCloudWatchClient struct{}
func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) { func (m *mockGatherCloudWatchClient) ListMetrics(params *cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error) {
return &cloudwatch.ListMetricsOutput{ return &cwClient.ListMetricsOutput{
Metrics: []*cloudwatch.Metric{ Metrics: []*cwClient.Metric{
{ {
Namespace: params.Namespace, Namespace: params.Namespace,
MetricName: aws.String("Latency"), MetricName: aws.String("Latency"),
Dimensions: []*cloudwatch.Dimension{ Dimensions: []*cwClient.Dimension{
{ {
Name: aws.String("LoadBalancerName"), Name: aws.String("LoadBalancerName"),
Value: aws.String("p-example"), Value: aws.String("p-example"),
@ -35,9 +34,9 @@ func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsI
}, nil }, nil
} }
func (m *mockGatherCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) { func (m *mockGatherCloudWatchClient) GetMetricData(params *cwClient.GetMetricDataInput) (*cwClient.GetMetricDataOutput, error) {
return &cloudwatch.GetMetricDataOutput{ return &cwClient.GetMetricDataOutput{
MetricDataResults: []*cloudwatch.MetricDataResult{ MetricDataResults: []*cwClient.MetricDataResult{
{ {
Id: aws.String("minimum_0_0"), Id: aws.String("minimum_0_0"),
Label: aws.String("latency_minimum"), Label: aws.String("latency_minimum"),
@ -98,8 +97,8 @@ func (m *mockGatherCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricD
} }
func TestSnakeCase(t *testing.T) { func TestSnakeCase(t *testing.T) {
assert.Equal(t, "cluster_name", snakeCase("Cluster Name")) require.Equal(t, "cluster_name", snakeCase("Cluster Name"))
assert.Equal(t, "broker_id", snakeCase("Broker ID")) require.Equal(t, "broker_id", snakeCase("Broker ID"))
} }
func TestGather(t *testing.T) { func TestGather(t *testing.T) {
@ -116,7 +115,7 @@ func TestGather(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
c.client = &mockGatherCloudWatchClient{} c.client = &mockGatherCloudWatchClient{}
assert.NoError(t, acc.GatherError(c.Gather)) require.NoError(t, acc.GatherError(c.Gather))
fields := map[string]interface{}{} fields := map[string]interface{}{}
fields["latency_minimum"] = 0.1 fields["latency_minimum"] = 0.1
@ -129,14 +128,14 @@ func TestGather(t *testing.T) {
tags["region"] = "us-east-1" tags["region"] = "us-east-1"
tags["load_balancer_name"] = "p-example" tags["load_balancer_name"] = "p-example"
assert.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
} }
type mockSelectMetricsCloudWatchClient struct{} type mockSelectMetricsCloudWatchClient struct{}
func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) { func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error) {
metrics := []*cloudwatch.Metric{} metrics := []*cwClient.Metric{}
// 4 metrics are available // 4 metrics are available
metricNames := []string{"Latency", "RequestCount", "HealthyHostCount", "UnHealthyHostCount"} metricNames := []string{"Latency", "RequestCount", "HealthyHostCount", "UnHealthyHostCount"}
// for 3 ELBs // for 3 ELBs
@ -146,10 +145,10 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetric
for _, m := range metricNames { for _, m := range metricNames {
for _, lb := range loadBalancers { for _, lb := range loadBalancers {
// For each metric/ELB pair, we get an aggregate value across all AZs. // For each metric/ELB pair, we get an aggregate value across all AZs.
metrics = append(metrics, &cloudwatch.Metric{ metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String("AWS/ELB"), Namespace: aws.String("AWS/ELB"),
MetricName: aws.String(m), MetricName: aws.String(m),
Dimensions: []*cloudwatch.Dimension{ Dimensions: []*cwClient.Dimension{
{ {
Name: aws.String("LoadBalancerName"), Name: aws.String("LoadBalancerName"),
Value: aws.String(lb), Value: aws.String(lb),
@ -158,10 +157,10 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetric
}) })
for _, az := range availabilityZones { for _, az := range availabilityZones {
// We get a metric for each metric/ELB/AZ triplet. // We get a metric for each metric/ELB/AZ triplet.
metrics = append(metrics, &cloudwatch.Metric{ metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String("AWS/ELB"), Namespace: aws.String("AWS/ELB"),
MetricName: aws.String(m), MetricName: aws.String(m),
Dimensions: []*cloudwatch.Dimension{ Dimensions: []*cwClient.Dimension{
{ {
Name: aws.String("LoadBalancerName"), Name: aws.String("LoadBalancerName"),
Value: aws.String(lb), Value: aws.String(lb),
@ -176,13 +175,13 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cloudwatch.ListMetric
} }
} }
result := &cloudwatch.ListMetricsOutput{ result := &cwClient.ListMetricsOutput{
Metrics: metrics, Metrics: metrics,
} }
return result, nil return result, nil
} }
func (m *mockSelectMetricsCloudWatchClient) GetMetricData(_ *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) { func (m *mockSelectMetricsCloudWatchClient) GetMetricData(_ *cwClient.GetMetricDataInput) (*cwClient.GetMetricDataOutput, error) {
return nil, nil return nil, nil
} }
@ -212,24 +211,24 @@ func TestSelectMetrics(t *testing.T) {
}, },
} }
err := c.initializeCloudWatch() err := c.initializeCloudWatch()
assert.NoError(t, err) require.NoError(t, err)
c.client = &mockSelectMetricsCloudWatchClient{} c.client = &mockSelectMetricsCloudWatchClient{}
filtered, err := getFilteredMetrics(c) filtered, err := getFilteredMetrics(c)
// We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2 // We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2
// AZs. We should get 12 metrics. // AZs. We should get 12 metrics.
assert.Equal(t, 12, len(filtered[0].metrics)) require.Equal(t, 12, len(filtered[0].metrics))
assert.NoError(t, err) require.NoError(t, err)
} }
func TestGenerateStatisticsInputParams(t *testing.T) { func TestGenerateStatisticsInputParams(t *testing.T) {
d := &cloudwatch.Dimension{ d := &cwClient.Dimension{
Name: aws.String("LoadBalancerName"), Name: aws.String("LoadBalancerName"),
Value: aws.String("p-example"), Value: aws.String("p-example"),
} }
m := &cloudwatch.Metric{ m := &cwClient.Metric{
MetricName: aws.String("Latency"), MetricName: aws.String("Latency"),
Dimensions: []*cloudwatch.Dimension{d}, Dimensions: []*cwClient.Dimension{d},
} }
duration, _ := time.ParseDuration("1m") duration, _ := time.ParseDuration("1m")
@ -248,25 +247,25 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
c.updateWindow(now) c.updateWindow(now)
statFilter, _ := filter.NewIncludeExcludeFilter(nil, nil) statFilter, _ := filter.NewIncludeExcludeFilter(nil, nil)
queries := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}}) queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}})
params := c.getDataInputs(queries) params := c.getDataInputs(queries)
assert.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay)))
assert.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay)))
require.Len(t, params.MetricDataQueries, 5) require.Len(t, params.MetricDataQueries, 5)
assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) require.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60)
} }
func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
d := &cloudwatch.Dimension{ d := &cwClient.Dimension{
Name: aws.String("LoadBalancerName"), Name: aws.String("LoadBalancerName"),
Value: aws.String("p-example"), Value: aws.String("p-example"),
} }
m := &cloudwatch.Metric{ m := &cwClient.Metric{
MetricName: aws.String("Latency"), MetricName: aws.String("Latency"),
Dimensions: []*cloudwatch.Dimension{d}, Dimensions: []*cwClient.Dimension{d},
} }
duration, _ := time.ParseDuration("1m") duration, _ := time.ParseDuration("1m")
@ -285,14 +284,14 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
c.updateWindow(now) c.updateWindow(now)
statFilter, _ := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil) statFilter, _ := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil)
queries := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}}) queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}})
params := c.getDataInputs(queries) params := c.getDataInputs(queries)
assert.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay)))
assert.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay)))
require.Len(t, params.MetricDataQueries, 2) require.Len(t, params.MetricDataQueries, 2)
assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) require.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60)
} }
func TestMetricsCacheTimeout(t *testing.T) { func TestMetricsCacheTimeout(t *testing.T) {
@ -302,9 +301,9 @@ func TestMetricsCacheTimeout(t *testing.T) {
ttl: time.Minute, ttl: time.Minute,
} }
assert.True(t, cache.isValid()) require.True(t, cache.isValid())
cache.built = time.Now().Add(-time.Minute) cache.built = time.Now().Add(-time.Minute)
assert.False(t, cache.isValid()) require.False(t, cache.isValid())
} }
func TestUpdateWindow(t *testing.T) { func TestUpdateWindow(t *testing.T) {
@ -319,23 +318,23 @@ func TestUpdateWindow(t *testing.T) {
now := time.Now() now := time.Now()
assert.True(t, c.windowEnd.IsZero()) require.True(t, c.windowEnd.IsZero())
assert.True(t, c.windowStart.IsZero()) require.True(t, c.windowStart.IsZero())
c.updateWindow(now) c.updateWindow(now)
newStartTime := c.windowEnd newStartTime := c.windowEnd
// initial window just has a single period // initial window just has a single period
assert.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay)))
assert.EqualValues(t, c.windowStart, now.Add(-time.Duration(c.Delay)).Add(-time.Duration(c.Period))) require.EqualValues(t, c.windowStart, now.Add(-time.Duration(c.Delay)).Add(-time.Duration(c.Period)))
now = time.Now() now = time.Now()
c.updateWindow(now) c.updateWindow(now)
// subsequent window uses previous end time as start time // subsequent window uses previous end time as start time
assert.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay)))
assert.EqualValues(t, c.windowStart, newStartTime) require.EqualValues(t, c.windowStart, newStartTime)
} }
func TestProxyFunction(t *testing.T) { func TestProxyFunction(t *testing.T) {

View File

@ -7,7 +7,8 @@ import (
"sync" "sync"
"time" "time"
couchbase "github.com/couchbase/go-couchbase" couchbaseClient "github.com/couchbase/go-couchbase"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -33,7 +34,7 @@ var sampleConfig = `
## If no port is specified, 8091 is used. ## If no port is specified, 8091 is used.
servers = ["http://localhost:8091"] servers = ["http://localhost:8091"]
## Filter fields to include only here. ## Filter bucket fields to include only here.
# bucket_stats_included = ["quota_percent_used", "ops_per_sec", "disk_fetches", "item_count", "disk_used", "data_used", "mem_used"] # bucket_stats_included = ["quota_percent_used", "ops_per_sec", "disk_fetches", "item_count", "disk_used", "data_used", "mem_used"]
` `
@ -45,14 +46,14 @@ func (cb *Couchbase) SampleConfig() string {
} }
func (cb *Couchbase) Description() string { func (cb *Couchbase) Description() string {
return "Read metrics from one or many couchbase clusters" return "Read per-node and per-bucket metrics from Couchbase"
} }
// Reads stats from all configured clusters. Accumulates stats. // Reads stats from all configured clusters. Accumulates stats.
// Returns one of the errors encountered while gathering stats (if any). // Returns one of the errors encountered while gathering stats (if any).
func (cb *Couchbase) Gather(acc telegraf.Accumulator) error { func (cb *Couchbase) Gather(acc telegraf.Accumulator) error {
if len(cb.Servers) == 0 { if len(cb.Servers) == 0 {
return cb.gatherServer("http://localhost:8091/", acc, nil) return cb.gatherServer(acc, "http://localhost:8091/", nil)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -60,7 +61,7 @@ func (cb *Couchbase) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(serv string) { go func(serv string) {
defer wg.Done() defer wg.Done()
acc.AddError(cb.gatherServer(serv, acc, nil)) acc.AddError(cb.gatherServer(acc, serv, nil))
}(serv) }(serv)
} }
@ -69,9 +70,9 @@ func (cb *Couchbase) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (cb *Couchbase) gatherServer(addr string, acc telegraf.Accumulator, pool *couchbase.Pool) error { func (cb *Couchbase) gatherServer(acc telegraf.Accumulator, addr string, pool *couchbaseClient.Pool) error {
if pool == nil { if pool == nil {
client, err := couchbase.Connect(addr) client, err := couchbaseClient.Connect(addr)
if err != nil { if err != nil {
return err return err
} }

View File

@ -43,7 +43,7 @@ func TestGatherServer(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = cb.gatherServer(fakeServer.URL, &acc, &pool) err = cb.gatherServer(&acc, fakeServer.URL, &pool)
require.NoError(t, err) require.NoError(t, err)
acc.AssertContainsTaggedFields(t, "couchbase_node", acc.AssertContainsTaggedFields(t, "couchbase_node",
map[string]interface{}{"memory_free": 23181365248.0, "memory_total": 64424656896.0}, map[string]interface{}{"memory_free": 23181365248.0, "memory_total": 64424656896.0},

View File

@ -4,14 +4,15 @@ The `cpu` plugin gather metrics on the system CPUs.
#### Configuration #### Configuration
```toml ```toml
# Read metrics about cpu usage
[[inputs.cpu]] [[inputs.cpu]]
## Whether to report per-cpu stats or not ## Whether to report per-cpu stats or not
percpu = true percpu = true
## Whether to report total system cpu stats or not ## Whether to report total system cpu stats or not
totalcpu = true totalcpu = true
## If true, collect raw CPU time metrics. ## If true, collect raw CPU time metrics
collect_cpu_time = false collect_cpu_time = false
## If true, compute and report the sum of all non-idle CPU states. ## If true, compute and report the sum of all non-idle CPU states
report_active = false report_active = false
``` ```

View File

@ -4,15 +4,16 @@ import (
"fmt" "fmt"
"time" "time"
cpuUtil "github.com/shirou/gopsutil/cpu"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/system" "github.com/influxdata/telegraf/plugins/inputs/system"
"github.com/shirou/gopsutil/cpu"
) )
type CPUStats struct { type CPUStats struct {
ps system.PS ps system.PS
lastStats map[string]cpu.TimesStat lastStats map[string]cpuUtil.TimesStat
PerCPU bool `toml:"percpu"` PerCPU bool `toml:"percpu"`
TotalCPU bool `toml:"totalcpu"` TotalCPU bool `toml:"totalcpu"`
@ -123,7 +124,7 @@ func (c *CPUStats) Gather(acc telegraf.Accumulator) error {
acc.AddGauge("cpu", fieldsG, tags, now) acc.AddGauge("cpu", fieldsG, tags, now)
} }
c.lastStats = make(map[string]cpu.TimesStat) c.lastStats = make(map[string]cpuUtil.TimesStat)
for _, cts := range times { for _, cts := range times {
c.lastStats[cts.CPU] = cts c.lastStats[cts.CPU] = cts
} }
@ -131,12 +132,12 @@ func (c *CPUStats) Gather(acc telegraf.Accumulator) error {
return err return err
} }
func totalCPUTime(t cpu.TimesStat) float64 { func totalCPUTime(t cpuUtil.TimesStat) float64 {
total := t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + t.Idle total := t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + t.Idle
return total return total
} }
func activeCPUTime(t cpu.TimesStat) float64 { func activeCPUTime(t cpuUtil.TimesStat) float64 {
active := totalCPUTime(t) - t.Idle active := totalCPUTime(t) - t.Idle
return active return active
} }

View File

@ -4,11 +4,11 @@ import (
"fmt" "fmt"
"testing" "testing"
cpuUtil "github.com/shirou/gopsutil/cpu"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/system" "github.com/influxdata/telegraf/plugins/inputs/system"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/cpu"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestCPUStats(t *testing.T) { func TestCPUStats(t *testing.T) {
@ -16,7 +16,7 @@ func TestCPUStats(t *testing.T) {
defer mps.AssertExpectations(t) defer mps.AssertExpectations(t)
var acc testutil.Accumulator var acc testutil.Accumulator
cts := cpu.TimesStat{ cts := cpuUtil.TimesStat{
CPU: "cpu0", CPU: "cpu0",
User: 8.8, User: 8.8,
System: 8.2, System: 8.2,
@ -30,7 +30,7 @@ func TestCPUStats(t *testing.T) {
GuestNice: 0.324, GuestNice: 0.324,
} }
cts2 := cpu.TimesStat{ cts2 := cpuUtil.TimesStat{
CPU: "cpu0", CPU: "cpu0",
User: 24.9, // increased by 16.1 User: 24.9, // increased by 16.1
System: 10.9, // increased by 2.7 System: 10.9, // increased by 2.7
@ -44,7 +44,7 @@ func TestCPUStats(t *testing.T) {
GuestNice: 2.524, // increased by 2.2 GuestNice: 2.524, // increased by 2.2
} }
mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) mps.On("CPUTimes").Return([]cpuUtil.TimesStat{cts}, nil)
cs := NewCPUStats(&mps) cs := NewCPUStats(&mps)
@ -66,7 +66,7 @@ func TestCPUStats(t *testing.T) {
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0) assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0)
mps2 := system.MockPS{} mps2 := system.MockPS{}
mps2.On("CPUTimes").Return([]cpu.TimesStat{cts2}, nil) mps2.On("CPUTimes").Return([]cpuUtil.TimesStat{cts2}, nil)
cs.ps = &mps2 cs.ps = &mps2
// Should have added cpu percentages too // Should have added cpu percentages too
@ -131,8 +131,7 @@ func assertContainsTaggedFloat(
return return
} }
} else { } else {
assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64", require.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64", measurement))
measurement))
} }
} }
} }
@ -141,7 +140,7 @@ func assertContainsTaggedFloat(
msg := fmt.Sprintf( msg := fmt.Sprintf(
"Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f", "Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f",
measurement, delta, expectedValue, actualValue) measurement, delta, expectedValue, actualValue)
assert.Fail(t, msg) require.Fail(t, msg)
} }
// TestCPUCountChange tests that no errors are encountered if the number of // TestCPUCountChange tests that no errors are encountered if the number of
@ -155,7 +154,7 @@ func TestCPUCountIncrease(t *testing.T) {
cs := NewCPUStats(&mps) cs := NewCPUStats(&mps)
mps.On("CPUTimes").Return( mps.On("CPUTimes").Return(
[]cpu.TimesStat{ []cpuUtil.TimesStat{
{ {
CPU: "cpu0", CPU: "cpu0",
}, },
@ -165,7 +164,7 @@ func TestCPUCountIncrease(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
mps2.On("CPUTimes").Return( mps2.On("CPUTimes").Return(
[]cpu.TimesStat{ []cpuUtil.TimesStat{
{ {
CPU: "cpu0", CPU: "cpu0",
}, },
@ -186,28 +185,28 @@ func TestCPUTimesDecrease(t *testing.T) {
defer mps.AssertExpectations(t) defer mps.AssertExpectations(t)
var acc testutil.Accumulator var acc testutil.Accumulator
cts := cpu.TimesStat{ cts := cpuUtil.TimesStat{
CPU: "cpu0", CPU: "cpu0",
User: 18, User: 18,
Idle: 80, Idle: 80,
Iowait: 2, Iowait: 2,
} }
cts2 := cpu.TimesStat{ cts2 := cpuUtil.TimesStat{
CPU: "cpu0", CPU: "cpu0",
User: 38, // increased by 20 User: 38, // increased by 20
Idle: 40, // decreased by 40 Idle: 40, // decreased by 40
Iowait: 1, // decreased by 1 Iowait: 1, // decreased by 1
} }
cts3 := cpu.TimesStat{ cts3 := cpuUtil.TimesStat{
CPU: "cpu0", CPU: "cpu0",
User: 56, // increased by 18 User: 56, // increased by 18
Idle: 120, // increased by 80 Idle: 120, // increased by 80
Iowait: 3, // increased by 2 Iowait: 3, // increased by 2
} }
mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) mps.On("CPUTimes").Return([]cpuUtil.TimesStat{cts}, nil)
cs := NewCPUStats(&mps) cs := NewCPUStats(&mps)
@ -221,7 +220,7 @@ func TestCPUTimesDecrease(t *testing.T) {
assertContainsTaggedFloat(t, &acc, "time_iowait", 2, 0) assertContainsTaggedFloat(t, &acc, "time_iowait", 2, 0)
mps2 := system.MockPS{} mps2 := system.MockPS{}
mps2.On("CPUTimes").Return([]cpu.TimesStat{cts2}, nil) mps2.On("CPUTimes").Return([]cpuUtil.TimesStat{cts2}, nil)
cs.ps = &mps2 cs.ps = &mps2
// CPU times decreased. An error should be raised // CPU times decreased. An error should be raised
@ -229,7 +228,7 @@ func TestCPUTimesDecrease(t *testing.T) {
require.Error(t, err) require.Error(t, err)
mps3 := system.MockPS{} mps3 := system.MockPS{}
mps3.On("CPUTimes").Return([]cpu.TimesStat{cts3}, nil) mps3.On("CPUTimes").Return([]cpuUtil.TimesStat{cts3}, nil)
cs.ps = &mps3 cs.ps = &mps3
err = cs.Gather(&acc) err = cs.Gather(&acc)

View File

@ -4,6 +4,7 @@ The `csgo` plugin gather metrics from Counter-Strike: Global Offensive servers.
#### Configuration #### Configuration
```toml ```toml
# Fetch metrics from a CSGO SRCDS
[[inputs.csgo]] [[inputs.csgo]]
## Specify servers using the following format: ## Specify servers using the following format:
## servers = [ ## servers = [

View File

@ -8,9 +8,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/james4k/rcon"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/james4k/rcon"
) )
type statsData struct { type statsData struct {
@ -30,7 +31,7 @@ type CSGO struct {
Servers [][]string `toml:"servers"` Servers [][]string `toml:"servers"`
} }
func (_ *CSGO) Description() string { func (*CSGO) Description() string {
return "Fetch metrics from a CSGO SRCDS" return "Fetch metrics from a CSGO SRCDS"
} }
@ -45,7 +46,7 @@ var sampleConfig = `
servers = [] servers = []
` `
func (_ *CSGO) SampleConfig() string { func (*CSGO) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -57,7 +58,7 @@ func (s *CSGO) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(ss []string) { go func(ss []string) {
defer wg.Done() defer wg.Done()
acc.AddError(s.gatherServer(ss, requestServer, acc)) acc.AddError(s.gatherServer(acc, ss, requestServer))
}(server) }(server)
} }
@ -72,9 +73,9 @@ func init() {
} }
func (s *CSGO) gatherServer( func (s *CSGO) gatherServer(
acc telegraf.Accumulator,
server []string, server []string,
request func(string, string) (string, error), request func(string, string) (string, error),
acc telegraf.Accumulator,
) error { ) error {
if len(server) != 2 { if len(server) != 2 {
return errors.New("incorrect server config") return errors.New("incorrect server config")

View File

@ -19,7 +19,7 @@ var (
func TestCPUStats(t *testing.T) { func TestCPUStats(t *testing.T) {
c := NewCSGOStats() c := NewCSGOStats()
var acc testutil.Accumulator var acc testutil.Accumulator
err := c.gatherServer(c.Servers[0], requestMock, &acc) err := c.gatherServer(&acc, c.Servers[0], requestMock)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }