Revive fixes - part 1 (#8797)
* Revive fixes regarding following set of rules: [rule.blank-imports] [rule.context-as-argument] [rule.context-keys-type] [rule.dot-imports] [rule.error-return] [rule.error-strings] [rule.indent-error-flow] [rule.errorf]
This commit is contained in:
parent
90392e16d1
commit
ba66d4facb
|
|
@ -22,9 +22,9 @@ type CredentialConfig struct {
|
|||
func (c *CredentialConfig) Credentials() client.ConfigProvider {
|
||||
if c.RoleARN != "" {
|
||||
return c.assumeCredentials()
|
||||
} else {
|
||||
return c.rootCredentials()
|
||||
}
|
||||
|
||||
return c.rootCredentials()
|
||||
}
|
||||
|
||||
func (c *CredentialConfig) rootCredentials() client.ConfigProvider {
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
|
|||
|
||||
// If SIGTERM was sent then treat any process error as a timeout.
|
||||
if termSent {
|
||||
return TimeoutErr
|
||||
return ErrTimeout
|
||||
}
|
||||
|
||||
// Otherwise there was an error unrelated to termination.
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
|
|||
|
||||
// If SIGTERM was sent then treat any process error as a timeout.
|
||||
if termSent {
|
||||
return TimeoutErr
|
||||
return ErrTimeout
|
||||
}
|
||||
|
||||
// Otherwise there was an error unrelated to termination.
|
||||
|
|
|
|||
|
|
@ -26,11 +26,9 @@ import (
|
|||
const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
||||
|
||||
var (
|
||||
TimeoutErr = errors.New("Command timed out.")
|
||||
|
||||
NotImplementedError = errors.New("not implemented yet")
|
||||
|
||||
VersionAlreadySetError = errors.New("version has already been set")
|
||||
ErrTimeout = errors.New("command timed out")
|
||||
ErrorNotImplemented = errors.New("not implemented yet")
|
||||
ErrorVersionAlreadySet = errors.New("version has already been set")
|
||||
)
|
||||
|
||||
// Set via the main module
|
||||
|
|
@ -58,7 +56,7 @@ type ReadWaitCloser struct {
|
|||
// SetVersion sets the telegraf agent version
|
||||
func SetVersion(v string) error {
|
||||
if version != "" {
|
||||
return VersionAlreadySetError
|
||||
return ErrorVersionAlreadySet
|
||||
}
|
||||
version = v
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ func TestRunTimeout(t *testing.T) {
|
|||
err := RunTimeout(cmd, time.Millisecond*20)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
assert.Equal(t, TimeoutErr, err)
|
||||
assert.Equal(t, ErrTimeout, err)
|
||||
// Verify that command gets killed in 20ms, with some breathing room
|
||||
assert.True(t, elapsed < time.Millisecond*75)
|
||||
}
|
||||
|
|
@ -102,7 +102,7 @@ func TestCombinedOutputTimeout(t *testing.T) {
|
|||
_, err := CombinedOutputTimeout(cmd, time.Millisecond*20)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
assert.Equal(t, TimeoutErr, err)
|
||||
assert.Equal(t, ErrTimeout, err)
|
||||
// Verify that command gets killed in 20ms, with some breathing room
|
||||
assert.True(t, elapsed < time.Millisecond*75)
|
||||
}
|
||||
|
|
@ -273,7 +273,7 @@ func TestVersionAlreadySet(t *testing.T) {
|
|||
err = SetVersion("bar")
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.IsType(t, VersionAlreadySetError, err)
|
||||
assert.IsType(t, ErrorVersionAlreadySet, err)
|
||||
|
||||
assert.Equal(t, "foo", Version())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -226,9 +226,8 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
|
|||
func (b *Buffer) dist(begin, end int) int {
|
||||
if begin <= end {
|
||||
return end - begin
|
||||
} else {
|
||||
return b.cap - begin + end
|
||||
}
|
||||
return b.cap - begin + end
|
||||
}
|
||||
|
||||
// next returns the next index with wrapping.
|
||||
|
|
|
|||
|
|
@ -541,7 +541,7 @@ func (m *mockOutput) Write(metrics []telegraf.Metric) error {
|
|||
m.Lock()
|
||||
defer m.Unlock()
|
||||
if m.failWrite {
|
||||
return fmt.Errorf("Failed Write!")
|
||||
return fmt.Errorf("failed write")
|
||||
}
|
||||
|
||||
if m.metrics == nil {
|
||||
|
|
@ -583,7 +583,7 @@ func (m *perfOutput) SampleConfig() string {
|
|||
|
||||
func (m *perfOutput) Write(metrics []telegraf.Metric) error {
|
||||
if m.failWrite {
|
||||
return fmt.Errorf("Failed Write!")
|
||||
return fmt.Errorf("failed write")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package all
|
||||
|
||||
import (
|
||||
//Blank imports for plugins to register themselves
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/final"
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package all
|
||||
|
||||
import (
|
||||
//Blank imports for plugins to register themselves
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/activemq"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error {
|
|||
for _, u := range n.Urls {
|
||||
addr, err := url.Parse(u)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err))
|
||||
acc.AddError(fmt.Errorf("unable to parse address '%s': %s", u, err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -111,7 +111,7 @@ func (n *Apache) createHttpClient() (*http.Client, error) {
|
|||
func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
||||
req, err := http.NewRequest("GET", addr.String(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error on new request to %s : %s\n", addr.String(), err)
|
||||
return fmt.Errorf("error on new request to %s : %s", addr.String(), err)
|
||||
}
|
||||
|
||||
if len(n.Username) != 0 && len(n.Password) != 0 {
|
||||
|
|
@ -120,7 +120,7 @@ func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
|||
|
||||
resp, err := n.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error on request to %s : %s\n", addr.String(), err)
|
||||
return fmt.Errorf("error on request to %s : %s", addr.String(), err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ func (b *Bind) Gather(acc telegraf.Accumulator) error {
|
|||
for _, u := range b.Urls {
|
||||
addr, err := url.Parse(u)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err))
|
||||
acc.AddError(fmt.Errorf("unable to parse address '%s': %s", u, err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -88,7 +88,7 @@ func (b *Bind) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
|
|||
// BIND 9.9+
|
||||
return b.readStatsXMLv3(addr, acc)
|
||||
default:
|
||||
return fmt.Errorf("URL %s is ambiguous. Please check plugin documentation for supported URL formats.",
|
||||
return fmt.Errorf("provided URL %s is ambiguous, please check plugin documentation for supported URL formats",
|
||||
addr)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -623,5 +623,5 @@ func TestBindUnparseableURL(t *testing.T) {
|
|||
|
||||
var acc testutil.Accumulator
|
||||
err := acc.GatherError(b.Gather)
|
||||
assert.Contains(t, err.Error(), "Unable to parse address")
|
||||
assert.Contains(t, err.Error(), "unable to parse address")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
|
@ -28,9 +27,10 @@ func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error
|
|||
|
||||
type Cassandra struct {
|
||||
jClient JolokiaClient
|
||||
Context string
|
||||
Servers []string
|
||||
Metrics []string
|
||||
Context string `toml:"context"`
|
||||
Servers []string `toml:"servers"`
|
||||
Metrics []string `toml:"metrics"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
}
|
||||
|
||||
type javaMetric struct {
|
||||
|
|
@ -125,8 +125,7 @@ func (j javaMetric) addTagsFields(out map[string]interface{}) {
|
|||
}
|
||||
j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
|
||||
} else {
|
||||
j.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n",
|
||||
j.metric, out))
|
||||
j.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", j.metric, out))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -157,8 +156,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
|
|||
addCassandraMetric(k, c, v.(map[string]interface{}))
|
||||
}
|
||||
} else {
|
||||
c.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n",
|
||||
c.metric, out))
|
||||
c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
|
|
@ -166,8 +164,7 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
|
|||
addCassandraMetric(r.(map[string]interface{})["mbean"].(string),
|
||||
c, values.(map[string]interface{}))
|
||||
} else {
|
||||
c.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n",
|
||||
c.metric, out))
|
||||
c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -215,7 +212,7 @@ func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error)
|
|||
|
||||
// Process response
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
||||
err = fmt.Errorf("response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
||||
requestUrl,
|
||||
resp.StatusCode,
|
||||
http.StatusText(resp.StatusCode),
|
||||
|
|
@ -232,8 +229,8 @@ func (j *Cassandra) getAttr(requestUrl *url.URL) (map[string]interface{}, error)
|
|||
|
||||
// Unmarshal json
|
||||
var jsonOut map[string]interface{}
|
||||
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
|
||||
return nil, errors.New("Error decoding JSON response")
|
||||
if err = json.Unmarshal(body, &jsonOut); err != nil {
|
||||
return nil, errors.New("error decoding JSON response")
|
||||
}
|
||||
|
||||
return jsonOut, nil
|
||||
|
|
@ -263,8 +260,8 @@ func parseServerTokens(server string) map[string]string {
|
|||
return serverTokens
|
||||
}
|
||||
|
||||
func (c *Cassandra) Start(acc telegraf.Accumulator) error {
|
||||
log.Println("W! DEPRECATED: The cassandra plugin has been deprecated. " +
|
||||
func (c *Cassandra) Start(_ telegraf.Accumulator) error {
|
||||
c.Log.Warn("DEPRECATED: The cassandra plugin has been deprecated. " +
|
||||
"Please use the jolokia2 plugin instead. " +
|
||||
"https://github.com/influxdata/telegraf/tree/master/plugins/inputs/jolokia2")
|
||||
return nil
|
||||
|
|
@ -290,8 +287,7 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
|
|||
m = newCassandraMetric(serverTokens["host"], metric, acc)
|
||||
} else {
|
||||
// unsupported metric type
|
||||
acc.AddError(fmt.Errorf("E! Unsupported Cassandra metric [%s], skipping",
|
||||
metric))
|
||||
acc.AddError(fmt.Errorf("unsupported Cassandra metric [%s], skipping", metric))
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -313,7 +309,7 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
|
|||
continue
|
||||
}
|
||||
if out["status"] != 200.0 {
|
||||
acc.AddError(fmt.Errorf("URL returned with status %v - %s\n", out["status"], requestUrl))
|
||||
acc.AddError(fmt.Errorf("provided URL returned with status %v - %s", out["status"], requestUrl))
|
||||
continue
|
||||
}
|
||||
m.addTagsFields(out)
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ import (
|
|||
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials" // Register GRPC gzip decoder to support compressed telemetry
|
||||
_ "google.golang.org/grpc/encoding/gzip"
|
||||
"google.golang.org/grpc/credentials"
|
||||
_ "google.golang.org/grpc/encoding/gzip" // Register GRPC gzip decoder to support compressed telemetry
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
|
|
@ -261,7 +261,7 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
|
|||
msg := &telemetry.Telemetry{}
|
||||
err := proto.Unmarshal(data, msg)
|
||||
if err != nil {
|
||||
c.acc.AddError(fmt.Errorf("Cisco MDT failed to decode: %v", err))
|
||||
c.acc.AddError(fmt.Errorf("failed to decode: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -308,9 +308,8 @@ func (d *DockerLogs) tailContainerLogs(
|
|||
// multiplexed.
|
||||
if hasTTY {
|
||||
return tailStream(acc, tags, container.ID, logReader, "tty")
|
||||
} else {
|
||||
return tailMultiplexed(acc, tags, container.ID, logReader)
|
||||
}
|
||||
return tailMultiplexed(acc, tags, container.ID, logReader)
|
||||
}
|
||||
|
||||
func parseLine(line []byte) (time.Time, string, error) {
|
||||
|
|
|
|||
|
|
@ -53,8 +53,7 @@ const defaultPort = "24242"
|
|||
// Reads stats from all configured servers.
|
||||
func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
|
||||
if !validQuery[d.Type] {
|
||||
return fmt.Errorf("Error: %s is not a valid query type\n",
|
||||
d.Type)
|
||||
return fmt.Errorf("error: %s is not a valid query type", d.Type)
|
||||
}
|
||||
|
||||
if len(d.Servers) == 0 {
|
||||
|
|
|
|||
|
|
@ -123,13 +123,13 @@ func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) erro
|
|||
c, err := net.Dial("unix", socketPath)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not connect to socket '%s': %s", addr, err)
|
||||
return fmt.Errorf("could not connect to socket '%s': %s", addr, err)
|
||||
}
|
||||
|
||||
_, errw := c.Write([]byte("show stat\n"))
|
||||
|
||||
if errw != nil {
|
||||
return fmt.Errorf("Could not write to socket '%s': %s", addr, errw)
|
||||
return fmt.Errorf("could not write to socket '%s': %s", addr, errw)
|
||||
}
|
||||
|
||||
return g.importCsvResult(c, acc, socketPath)
|
||||
|
|
@ -202,9 +202,8 @@ func getSocketAddr(sock string) string {
|
|||
|
||||
if len(socketAddr) >= 2 {
|
||||
return socketAddr[1]
|
||||
} else {
|
||||
return socketAddr[0]
|
||||
}
|
||||
return socketAddr[0]
|
||||
}
|
||||
|
||||
var typeNames = []string{"frontend", "backend", "server", "listener"}
|
||||
|
|
|
|||
|
|
@ -239,17 +239,16 @@ func (r *IntelRDT) createArgsAndStartPQOS(ctx context.Context) {
|
|||
if len(r.parsedCores) != 0 {
|
||||
coresArg := createArgCores(r.parsedCores)
|
||||
args = append(args, coresArg)
|
||||
go r.readData(args, nil, ctx)
|
||||
|
||||
go r.readData(ctx, args, nil)
|
||||
} else if len(r.processesPIDsMap) != 0 {
|
||||
processArg := createArgProcess(r.processesPIDsMap)
|
||||
args = append(args, processArg)
|
||||
go r.readData(args, r.processesPIDsMap, ctx)
|
||||
go r.readData(ctx, args, r.processesPIDsMap)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *IntelRDT) readData(args []string, processesPIDsAssociation map[string]string, ctx context.Context) {
|
||||
func (r *IntelRDT) readData(ctx context.Context, args []string, processesPIDsAssociation map[string]string) {
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
|
@ -57,6 +56,7 @@ type Jolokia struct {
|
|||
|
||||
ResponseHeaderTimeout internal.Duration `toml:"response_header_timeout"`
|
||||
ClientTimeout internal.Duration `toml:"client_timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
}
|
||||
|
||||
const sampleConfig = `
|
||||
|
|
@ -143,7 +143,7 @@ func (j *Jolokia) doRequest(req *http.Request) ([]map[string]interface{}, error)
|
|||
|
||||
// Process response
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
||||
err = fmt.Errorf("response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
||||
req.RequestURI,
|
||||
resp.StatusCode,
|
||||
http.StatusText(resp.StatusCode),
|
||||
|
|
@ -161,7 +161,7 @@ func (j *Jolokia) doRequest(req *http.Request) ([]map[string]interface{}, error)
|
|||
// Unmarshal json
|
||||
var jsonOut []map[string]interface{}
|
||||
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
|
||||
return nil, fmt.Errorf("Error decoding JSON response: %s: %s", err, body)
|
||||
return nil, fmt.Errorf("error decoding JSON response: %s: %s", err, body)
|
||||
}
|
||||
|
||||
return jsonOut, nil
|
||||
|
|
@ -259,9 +259,8 @@ func (j *Jolokia) extractValues(measurement string, value interface{}, fields ma
|
|||
}
|
||||
|
||||
func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
|
||||
|
||||
if j.jClient == nil {
|
||||
log.Println("W! DEPRECATED: the jolokia plugin has been deprecated " +
|
||||
j.Log.Warn("DEPRECATED: the jolokia plugin has been deprecated " +
|
||||
"in favor of the jolokia2 plugin " +
|
||||
"(https://github.com/influxdata/telegraf/tree/master/plugins/inputs/jolokia2)")
|
||||
|
||||
|
|
@ -299,18 +298,18 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
|
|||
}
|
||||
for i, resp := range out {
|
||||
if status, ok := resp["status"]; ok && status != float64(200) {
|
||||
acc.AddError(fmt.Errorf("Not expected status value in response body (%s:%s mbean=\"%s\" attribute=\"%s\"): %3.f",
|
||||
acc.AddError(fmt.Errorf("not expected status value in response body (%s:%s mbean=\"%s\" attribute=\"%s\"): %3.f",
|
||||
server.Host, server.Port, metrics[i].Mbean, metrics[i].Attribute, status))
|
||||
continue
|
||||
} else if !ok {
|
||||
acc.AddError(fmt.Errorf("Missing status in response body"))
|
||||
acc.AddError(fmt.Errorf("missing status in response body"))
|
||||
continue
|
||||
}
|
||||
|
||||
if values, ok := resp["value"]; ok {
|
||||
j.extractValues(metrics[i].Name, values, fields)
|
||||
} else {
|
||||
acc.AddError(fmt.Errorf("Missing key 'value' in output response\n"))
|
||||
acc.AddError(fmt.Errorf("missing key 'value' in output response"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -264,5 +264,5 @@ func TestHttpInvalidJson(t *testing.T) {
|
|||
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 0, len(acc.Metrics))
|
||||
assert.Contains(t, err.Error(), "Error decoding JSON response")
|
||||
assert.Contains(t, err.Error(), "error decoding JSON response")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -298,17 +298,15 @@ func (m *OpenConfigTelemetry) collectData(ctx context.Context,
|
|||
acc.AddError(fmt.Errorf("could not subscribe to %s: %v", grpcServer,
|
||||
err))
|
||||
return
|
||||
} else {
|
||||
// Retry with delay. If delay is not provided, use default
|
||||
if m.RetryDelay.Duration > 0 {
|
||||
m.Log.Debugf("Retrying %s with timeout %v", grpcServer,
|
||||
m.RetryDelay.Duration)
|
||||
time.Sleep(m.RetryDelay.Duration)
|
||||
continue
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Retry with delay. If delay is not provided, use default
|
||||
if m.RetryDelay.Duration > 0 {
|
||||
m.Log.Debugf("Retrying %s with timeout %v", grpcServer, m.RetryDelay.Duration)
|
||||
time.Sleep(m.RetryDelay.Duration)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
for {
|
||||
r, err := stream.Recv()
|
||||
|
|
|
|||
|
|
@ -140,11 +140,11 @@ func (k *Kafka) receiver() {
|
|||
return
|
||||
case err := <-k.errs:
|
||||
if err != nil {
|
||||
k.acc.AddError(fmt.Errorf("Consumer Error: %s\n", err))
|
||||
k.acc.AddError(fmt.Errorf("consumer Error: %s", err))
|
||||
}
|
||||
case msg := <-k.in:
|
||||
if k.MaxMessageLen != 0 && len(msg.Value) > k.MaxMessageLen {
|
||||
k.acc.AddError(fmt.Errorf("Message longer than max_message_len (%d > %d)",
|
||||
k.acc.AddError(fmt.Errorf("message longer than max_message_len (%d > %d)",
|
||||
len(msg.Value), k.MaxMessageLen))
|
||||
} else {
|
||||
metrics, err := k.parser.Parse(msg.Value)
|
||||
|
|
@ -173,7 +173,7 @@ func (k *Kafka) Stop() {
|
|||
defer k.Unlock()
|
||||
close(k.done)
|
||||
if err := k.Consumer.Close(); err != nil {
|
||||
k.acc.AddError(fmt.Errorf("Error closing consumer: %s\n", err.Error()))
|
||||
k.acc.AddError(fmt.Errorf("error closing consumer: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ func (k *Kernel) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
func (k *Kernel) getProcStat() ([]byte, error) {
|
||||
if _, err := os.Stat(k.statFile); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("kernel: %s does not exist!", k.statFile)
|
||||
return nil, fmt.Errorf("kernel: %s does not exist", k.statFile)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ func (k *KernelVmstat) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
func (k *KernelVmstat) getProcVmstat() ([]byte, error) {
|
||||
if _, err := os.Stat(k.statFile); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("kernel_vmstat: %s does not exist!", k.statFile)
|
||||
return nil, fmt.Errorf("kernel_vmstat: %s does not exist", k.statFile)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,11 +32,11 @@ const (
|
|||
|
||||
// Rcon package errors.
|
||||
var (
|
||||
ErrInvalidWrite = errors.New("Failed to write the payload correctly to remote connection.")
|
||||
ErrInvalidRead = errors.New("Failed to read the response correctly from remote connection.")
|
||||
ErrInvalidChallenge = errors.New("Server failed to mirror request challenge.")
|
||||
ErrUnauthorizedRequest = errors.New("Client not authorized to remote server.")
|
||||
ErrFailedAuthorization = errors.New("Failed to authorize to the remote server.")
|
||||
ErrInvalidWrite = errors.New("failed to write the payload correctly to remote connection")
|
||||
ErrInvalidRead = errors.New("failed to read the response correctly from remote connection")
|
||||
ErrInvalidChallenge = errors.New("server failed to mirror request challenge")
|
||||
ErrUnauthorizedRequest = errors.New("client not authorized to remote server")
|
||||
ErrFailedAuthorization = errors.New("failed to authorize to the remote server")
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package modbus
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"net/url"
|
||||
|
|
@ -34,6 +33,7 @@ type Modbus struct {
|
|||
Coils []fieldContainer `toml:"coils"`
|
||||
HoldingRegisters []fieldContainer `toml:"holding_registers"`
|
||||
InputRegisters []fieldContainer `toml:"input_registers"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
registers []register
|
||||
isConnected bool
|
||||
tcpHandler *mb.TCPClientHandler
|
||||
|
|
@ -341,9 +341,8 @@ func validateFieldContainers(t []fieldContainer, n string) error {
|
|||
canonical_name := item.Measurement + "." + item.Name
|
||||
if nameEncountered[canonical_name] {
|
||||
return fmt.Errorf("name '%s' is duplicated in measurement '%s' '%s' - '%s'", item.Name, item.Measurement, n, item.Name)
|
||||
} else {
|
||||
nameEncountered[canonical_name] = true
|
||||
}
|
||||
nameEncountered[canonical_name] = true
|
||||
|
||||
if n == cInputRegisters || n == cHoldingRegisters {
|
||||
// search byte order
|
||||
|
|
@ -696,7 +695,7 @@ func (m *Modbus) Gather(acc telegraf.Accumulator) error {
|
|||
if err != nil {
|
||||
mberr, ok := err.(*mb.ModbusError)
|
||||
if ok && mberr.ExceptionCode == mb.ExceptionCodeServerDeviceBusy && retry < m.Retries {
|
||||
log.Printf("I! [inputs.modbus] device busy! Retrying %d more time(s)...", m.Retries-retry)
|
||||
m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry)
|
||||
time.Sleep(m.RetriesWaitTime.Duration)
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -102,6 +102,7 @@ func TestCoils(t *testing.T) {
|
|||
Address: []uint16{ct.address},
|
||||
},
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = modbus.Init()
|
||||
|
|
@ -640,6 +641,7 @@ func TestHoldingRegisters(t *testing.T) {
|
|||
Address: hrt.address,
|
||||
},
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = modbus.Init()
|
||||
|
|
@ -694,6 +696,7 @@ func TestRetrySuccessful(t *testing.T) {
|
|||
Address: []uint16{0},
|
||||
},
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = modbus.Init()
|
||||
|
|
@ -739,6 +742,7 @@ func TestRetryFail(t *testing.T) {
|
|||
Address: []uint16{0},
|
||||
},
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = modbus.Init()
|
||||
|
|
@ -772,6 +776,7 @@ func TestRetryFail(t *testing.T) {
|
|||
Address: []uint16{0},
|
||||
},
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = modbus.Init()
|
||||
|
|
|
|||
|
|
@ -366,9 +366,8 @@ func linkMode(s Service) string {
|
|||
func serviceStatus(s Service) string {
|
||||
if s.Status == 0 {
|
||||
return "running"
|
||||
} else {
|
||||
return "failure"
|
||||
}
|
||||
return "failure"
|
||||
}
|
||||
|
||||
func pendingAction(s Service) string {
|
||||
|
|
@ -377,9 +376,8 @@ func pendingAction(s Service) string {
|
|||
return "unknown"
|
||||
}
|
||||
return pendingActions[s.PendingAction-1]
|
||||
} else {
|
||||
return "none"
|
||||
}
|
||||
return "none"
|
||||
}
|
||||
|
||||
func monitoringMode(s Service) string {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package multifile
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
|
|
@ -130,7 +129,7 @@ func (m *MultiFile) Gather(acc telegraf.Accumulator) error {
|
|||
}
|
||||
|
||||
if value == nil {
|
||||
return errors.New(fmt.Sprintf("invalid conversion %v", file.Conversion))
|
||||
return fmt.Errorf("invalid conversion %v", file.Conversion)
|
||||
}
|
||||
|
||||
fields[file.Dest] = value
|
||||
|
|
|
|||
|
|
@ -646,9 +646,8 @@ func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{
|
|||
return v, nil
|
||||
}
|
||||
return v, fmt.Errorf("could not parse value: %q", string(value))
|
||||
} else {
|
||||
return v2.ConvertGlobalVariables(key, value)
|
||||
}
|
||||
return v2.ConvertGlobalVariables(key, value)
|
||||
}
|
||||
|
||||
// gatherSlaveStatuses can be used to get replication analytics
|
||||
|
|
@ -782,42 +781,42 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
|
|||
case "Queries":
|
||||
i, err := strconv.ParseInt(string(val), 10, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
|
||||
acc.AddError(fmt.Errorf("error mysql: parsing %s int value (%s)", key, err))
|
||||
} else {
|
||||
fields["queries"] = i
|
||||
}
|
||||
case "Questions":
|
||||
i, err := strconv.ParseInt(string(val), 10, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
|
||||
acc.AddError(fmt.Errorf("error mysql: parsing %s int value (%s)", key, err))
|
||||
} else {
|
||||
fields["questions"] = i
|
||||
}
|
||||
case "Slow_queries":
|
||||
i, err := strconv.ParseInt(string(val), 10, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
|
||||
acc.AddError(fmt.Errorf("error mysql: parsing %s int value (%s)", key, err))
|
||||
} else {
|
||||
fields["slow_queries"] = i
|
||||
}
|
||||
case "Connections":
|
||||
i, err := strconv.ParseInt(string(val), 10, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
|
||||
acc.AddError(fmt.Errorf("error mysql: parsing %s int value (%s)", key, err))
|
||||
} else {
|
||||
fields["connections"] = i
|
||||
}
|
||||
case "Syncs":
|
||||
i, err := strconv.ParseInt(string(val), 10, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
|
||||
acc.AddError(fmt.Errorf("error mysql: parsing %s int value (%s)", key, err))
|
||||
} else {
|
||||
fields["syncs"] = i
|
||||
}
|
||||
case "Uptime":
|
||||
i, err := strconv.ParseInt(string(val), 10, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
|
||||
acc.AddError(fmt.Errorf("error mysql: parsing %s int value (%s)", key, err))
|
||||
} else {
|
||||
fields["uptime"] = i
|
||||
}
|
||||
|
|
@ -965,7 +964,7 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr
|
|||
case *string:
|
||||
fields[cols[i]] = *v
|
||||
default:
|
||||
return fmt.Errorf("Unknown column type - %T", v)
|
||||
return fmt.Errorf("unknown column type - %T", v)
|
||||
}
|
||||
}
|
||||
acc.AddFields("mysql_user_stats", fields, tags)
|
||||
|
|
@ -1129,7 +1128,7 @@ func getColSlice(l int) ([]interface{}, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Not Supported - %d columns", l)
|
||||
return nil, fmt.Errorf("not Supported - %d columns", l)
|
||||
}
|
||||
|
||||
// gatherPerfTableIOWaits can be used to get total count and time
|
||||
|
|
@ -1855,9 +1854,8 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
|
|||
func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
|
||||
if m.MetricVersion < 2 {
|
||||
return v1.ParseValue(value)
|
||||
} else {
|
||||
return parseValue(value)
|
||||
}
|
||||
return parseValue(value)
|
||||
}
|
||||
|
||||
// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package opcua_client
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
|
|
@ -198,7 +197,10 @@ func (o *OpcUA) Init() error {
|
|||
return err
|
||||
}
|
||||
|
||||
o.setupOptions()
|
||||
err = o.setupOptions()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"endpoint": o.Endpoint,
|
||||
|
|
@ -207,7 +209,6 @@ func (o *OpcUA) Init() error {
|
|||
o.ReadSuccess = selfstat.Register("opcua", "read_success", tags)
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (o *OpcUA) validateEndpoint() error {
|
||||
|
|
@ -353,10 +354,11 @@ func (o *OpcUA) validateOPCTags() error {
|
|||
if _, ok := nameEncountered[mp]; ok {
|
||||
return fmt.Errorf("name '%s' is duplicated (metric name '%s', tags '%s')",
|
||||
mp.fieldName, mp.metricName, mp.tags)
|
||||
} else {
|
||||
//add it to the set
|
||||
nameEncountered[mp] = struct{}{}
|
||||
}
|
||||
|
||||
//add it to the set
|
||||
nameEncountered[mp] = struct{}{}
|
||||
|
||||
//search identifier type
|
||||
switch node.tag.IdentifierType {
|
||||
case "s", "i", "g", "b":
|
||||
|
|
@ -402,14 +404,14 @@ func Connect(o *OpcUA) error {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.ConnectTimeout))
|
||||
defer cancel()
|
||||
if err := o.client.Connect(ctx); err != nil {
|
||||
return fmt.Errorf("Error in Client Connection: %s", err)
|
||||
return fmt.Errorf("error in Client Connection: %s", err)
|
||||
}
|
||||
|
||||
regResp, err := o.client.RegisterNodes(&ua.RegisterNodesRequest{
|
||||
NodesToRegister: o.nodeIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("RegisterNodes failed: %v", err)
|
||||
return fmt.Errorf("registerNodes failed: %v", err)
|
||||
}
|
||||
|
||||
o.req = &ua.ReadRequest{
|
||||
|
|
@ -420,7 +422,7 @@ func Connect(o *OpcUA) error {
|
|||
|
||||
err = o.getData()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Get Data Failed: %v", err)
|
||||
return fmt.Errorf("get Data Failed: %v", err)
|
||||
}
|
||||
|
||||
default:
|
||||
|
|
@ -430,11 +432,10 @@ func Connect(o *OpcUA) error {
|
|||
}
|
||||
|
||||
func (o *OpcUA) setupOptions() error {
|
||||
|
||||
// Get a list of the endpoints for our target server
|
||||
endpoints, err := opcua.GetEndpoints(o.Endpoint)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if o.Certificate == "" && o.PrivateKey == "" {
|
||||
|
|
@ -457,7 +458,7 @@ func (o *OpcUA) getData() error {
|
|||
o.ReadSuccess.Incr(1)
|
||||
for i, d := range resp.Results {
|
||||
if d.Status != ua.StatusOK {
|
||||
return fmt.Errorf("Status not OK: %v", d.Status)
|
||||
return fmt.Errorf("status not OK: %v", d.Status)
|
||||
}
|
||||
o.nodeData[i].TagName = o.nodes[i].tag.FieldName
|
||||
if d.Value != nil {
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ func (o *Openldap) Gather(acc telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
} else {
|
||||
acc.AddError(fmt.Errorf("Invalid setting for ssl: %s", o.TLS))
|
||||
acc.AddError(fmt.Errorf("invalid setting for ssl: %s", o.TLS))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
|
|
@ -208,15 +208,15 @@ func dnToMetric(dn string, o *Openldap) string {
|
|||
metricParts[i], metricParts[j] = metricParts[j], metricParts[i]
|
||||
}
|
||||
return strings.Join(metricParts[1:], "_")
|
||||
} else {
|
||||
metricName := strings.Trim(dn, " ")
|
||||
metricName = strings.Replace(metricName, " ", "_", -1)
|
||||
metricName = strings.ToLower(metricName)
|
||||
metricName = strings.TrimPrefix(metricName, "cn=")
|
||||
metricName = strings.Replace(metricName, strings.ToLower("cn=Monitor"), "", -1)
|
||||
metricName = strings.Replace(metricName, "cn=", "_", -1)
|
||||
return strings.Replace(metricName, ",", "", -1)
|
||||
}
|
||||
|
||||
metricName := strings.Trim(dn, " ")
|
||||
metricName = strings.Replace(metricName, " ", "_", -1)
|
||||
metricName = strings.ToLower(metricName)
|
||||
metricName = strings.TrimPrefix(metricName, "cn=")
|
||||
metricName = strings.Replace(metricName, strings.ToLower("cn=Monitor"), "", -1)
|
||||
metricName = strings.Replace(metricName, "cn=", "_", -1)
|
||||
return strings.Replace(metricName, ",", "", -1)
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
|||
|
|
@ -112,8 +112,7 @@ func (s *Opensmtpd) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
fields[field], err = strconv.ParseFloat(value, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("Expected a numerical value for %s = %v\n",
|
||||
stat, value))
|
||||
acc.AddError(fmt.Errorf("expected a numerical value for %s = %v", stat, value))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -170,7 +170,7 @@ func importMetric(stat []byte, acc telegraf.Accumulator) error {
|
|||
decoder := xml.NewDecoder(bytes.NewReader(stat))
|
||||
decoder.CharsetReader = charset.NewReaderLabel
|
||||
if err := decoder.Decode(&p); err != nil {
|
||||
return fmt.Errorf("Cannot parse input with error: %v\n", err)
|
||||
return fmt.Errorf("cannot parse input with error: %v", err)
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ func Test_Invalid_Xml(t *testing.T) {
|
|||
|
||||
err := r.Gather(&acc)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "Cannot parse input with error: EOF\n", err.Error())
|
||||
assert.Equal(t, "cannot parse input with error: EOF", err.Error())
|
||||
}
|
||||
|
||||
// We test this by ensure that the error message match the path of default cli
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ func newFcgiClient(h string, args ...interface{}) (*conn, error) {
|
|||
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
|
||||
con, err = net.DialUnix(h, nil, &laddr)
|
||||
default:
|
||||
err = errors.New("fcgi: we only accept int (port) or string (socket) params.")
|
||||
err = errors.New("fcgi: we only accept int (port) or string (socket) params")
|
||||
}
|
||||
fcgi := &conn{
|
||||
rwc: con,
|
||||
|
|
|
|||
|
|
@ -144,7 +144,7 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
|||
if strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://") {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
|
||||
return fmt.Errorf("unable parse server address '%s': %s", addr, err)
|
||||
}
|
||||
socketAddr := strings.Split(u.Host, ":")
|
||||
fcgiIp := socketAddr[0]
|
||||
|
|
@ -188,9 +188,8 @@ func (p *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumula
|
|||
if len(fpmErr) == 0 && err == nil {
|
||||
importMetric(bytes.NewReader(fpmOutput), acc, addr)
|
||||
return nil
|
||||
} else {
|
||||
return fmt.Errorf("Unable parse phpfpm status. Error: %v %v", string(fpmErr), err)
|
||||
}
|
||||
return fmt.Errorf("unable parse phpfpm status, error: %v %v", string(fpmErr), err)
|
||||
}
|
||||
|
||||
// Gather stat using http protocol
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ func (p *Ping) nativePing(destination string) (*pingStats, error) {
|
|||
|
||||
pinger, err := ping.NewPinger(destination)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create new pinger: %w", err)
|
||||
return nil, fmt.Errorf("failed to create new pinger: %w", err)
|
||||
}
|
||||
|
||||
// Required for windows. Despite the method name, this should work without the need to elevate privileges and has been tested on Windows 10
|
||||
|
|
@ -197,7 +197,7 @@ func (p *Ping) nativePing(destination string) (*pingStats, error) {
|
|||
pinger.Count = p.Count
|
||||
err = pinger.Run()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to run pinger: %w", err)
|
||||
return nil, fmt.Errorf("failed to run pinger: %w", err)
|
||||
}
|
||||
|
||||
ps.Statistics = *pinger.Statistics()
|
||||
|
|
@ -287,11 +287,11 @@ func percentile(values durationSlice, perc int) time.Duration {
|
|||
|
||||
if rankInteger >= count-1 {
|
||||
return values[count-1]
|
||||
} else {
|
||||
upper := values[rankInteger+1]
|
||||
lower := values[rankInteger]
|
||||
return lower + time.Duration(rankFraction*float64(upper-lower))
|
||||
}
|
||||
|
||||
upper := values[rankInteger+1]
|
||||
lower := values[rankInteger]
|
||||
return lower + time.Duration(rankFraction*float64(upper-lower))
|
||||
}
|
||||
|
||||
// Init ensures the plugin is configured correctly.
|
||||
|
|
@ -321,11 +321,11 @@ func (p *Ping) Init() error {
|
|||
} else {
|
||||
i, err := net.InterfaceByName(p.Interface)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get interface: %w", err)
|
||||
return fmt.Errorf("failed to get interface: %w", err)
|
||||
}
|
||||
addrs, err := i.Addrs()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get the address of interface: %w", err)
|
||||
return fmt.Errorf("failed to get the address of interface: %w", err)
|
||||
}
|
||||
p.sourceAddress = addrs[0].(*net.IPNet).IP.String()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
_ "github.com/jackc/pgx/stdlib"
|
||||
_ "github.com/jackc/pgx/stdlib" //to register stdlib from PostgreSQL Driver and Toolkit
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ func (t *tester) testProcFile2(_ string) ([]byte, error) {
|
|||
}
|
||||
|
||||
func testExecPSError() ([]byte, error) {
|
||||
return []byte("\nSTAT\nD\nI\nL\nR\nR+\nS\nS+\nSNs\nSs\nU\nZ\n"), fmt.Errorf("ERROR!")
|
||||
return []byte("\nSTAT\nD\nI\nL\nR\nR+\nS\nS+\nSNs\nSs\nU\nZ\n"), fmt.Errorf("error")
|
||||
}
|
||||
|
||||
const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ func (p *Proc) Percent(interval time.Duration) (float64, error) {
|
|||
cpu_perc, err := p.Process.Percent(time.Duration(0))
|
||||
if !p.hasCPUTimes && err == nil {
|
||||
p.hasCPUTimes = true
|
||||
return 0, fmt.Errorf("Must call Percent twice to compute percent cpu.")
|
||||
return 0, fmt.Errorf("must call Percent twice to compute percent cpu")
|
||||
}
|
||||
return cpu_perc, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
. "github.com/influxdata/telegraf/plugins/parsers/prometheus/common"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/prometheus/common"
|
||||
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
|
|
@ -55,7 +55,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
|||
for metricName, mf := range metricFamilies {
|
||||
for _, m := range mf.Metric {
|
||||
// reading tags
|
||||
tags := MakeLabels(m, nil)
|
||||
tags := common.MakeLabels(m, nil)
|
||||
|
||||
// reading fields
|
||||
var fields map[string]interface{}
|
||||
|
|
@ -82,7 +82,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
|||
} else {
|
||||
t = now
|
||||
}
|
||||
metric, err := metric.New(metricName, tags, fields, t, ValueType(mf.GetType()))
|
||||
metric, err := metric.New(metricName, tags, fields, t, common.ValueType(mf.GetType()))
|
||||
if err == nil {
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ func (r *RethinkDB) Gather(acc telegraf.Accumulator) error {
|
|||
for _, serv := range r.Servers {
|
||||
u, err := url.Parse(serv)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err))
|
||||
acc.AddError(fmt.Errorf("unable to parse to address '%s': %s", serv, err))
|
||||
continue
|
||||
} else if u.Scheme == "" {
|
||||
// fallback to simple string based address (i.e. "10.0.0.1:10000")
|
||||
|
|
@ -97,7 +97,7 @@ func (r *RethinkDB) gatherServer(server *Server, acc telegraf.Accumulator) error
|
|||
|
||||
server.session, err = gorethink.Connect(connectOpts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to connect to RethinkDB, %s", err.Error())
|
||||
}
|
||||
defer server.session.Close()
|
||||
|
||||
|
|
|
|||
|
|
@ -22,24 +22,24 @@ type Server struct {
|
|||
|
||||
func (s *Server) gatherData(acc telegraf.Accumulator) error {
|
||||
if err := s.getServerStatus(); err != nil {
|
||||
return fmt.Errorf("Failed to get server_status, %s\n", err)
|
||||
return fmt.Errorf("failed to get server_status, %s", err)
|
||||
}
|
||||
|
||||
if err := s.validateVersion(); err != nil {
|
||||
return fmt.Errorf("Failed version validation, %s\n", err.Error())
|
||||
return fmt.Errorf("failed version validation, %s", err.Error())
|
||||
}
|
||||
|
||||
if err := s.addClusterStats(acc); err != nil {
|
||||
fmt.Printf("error adding cluster stats, %s\n", err.Error())
|
||||
return fmt.Errorf("Error adding cluster stats, %s\n", err.Error())
|
||||
return fmt.Errorf("error adding cluster stats, %s", err.Error())
|
||||
}
|
||||
|
||||
if err := s.addMemberStats(acc); err != nil {
|
||||
return fmt.Errorf("Error adding member stats, %s\n", err.Error())
|
||||
return fmt.Errorf("error adding member stats, %s", err.Error())
|
||||
}
|
||||
|
||||
if err := s.addTableStats(acc); err != nil {
|
||||
return fmt.Errorf("Error adding table stats, %s\n", err.Error())
|
||||
return fmt.Errorf("error adding table stats, %s", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -58,7 +58,7 @@ func (s *Server) validateVersion() error {
|
|||
|
||||
majorVersion, err := strconv.Atoi(strings.Split(versionString, "")[0])
|
||||
if err != nil || majorVersion < 2 {
|
||||
return fmt.Errorf("unsupported major version %s\n", versionString)
|
||||
return fmt.Errorf("unsupported major version %s", versionString)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -80,7 +80,7 @@ func (s *Server) getServerStatus() error {
|
|||
}
|
||||
host, port, err := net.SplitHostPort(s.Url.Host)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to determine provided hostname from %s\n", s.Url.Host)
|
||||
return fmt.Errorf("unable to determine provided hostname from %s", s.Url.Host)
|
||||
}
|
||||
driverPort, _ := strconv.Atoi(port)
|
||||
for _, ss := range serverStatuses {
|
||||
|
|
@ -113,12 +113,12 @@ var ClusterTracking = []string{
|
|||
func (s *Server) addClusterStats(acc telegraf.Accumulator) error {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cluster stats query error, %s\n", err.Error())
|
||||
return fmt.Errorf("cluster stats query error, %s", err.Error())
|
||||
}
|
||||
defer cursor.Close()
|
||||
var clusterStats stats
|
||||
if err := cursor.One(&clusterStats); err != nil {
|
||||
return fmt.Errorf("failure to parse cluster stats, %s\n", err.Error())
|
||||
return fmt.Errorf("failure to parse cluster stats, %s", err.Error())
|
||||
}
|
||||
|
||||
tags := s.getDefaultTags()
|
||||
|
|
@ -141,12 +141,12 @@ var MemberTracking = []string{
|
|||
func (s *Server) addMemberStats(acc telegraf.Accumulator) error {
|
||||
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("member stats query error, %s\n", err.Error())
|
||||
return fmt.Errorf("member stats query error, %s", err.Error())
|
||||
}
|
||||
defer cursor.Close()
|
||||
var memberStats stats
|
||||
if err := cursor.One(&memberStats); err != nil {
|
||||
return fmt.Errorf("failure to parse member stats, %s\n", err.Error())
|
||||
return fmt.Errorf("failure to parse member stats, %s", err.Error())
|
||||
}
|
||||
|
||||
tags := s.getDefaultTags()
|
||||
|
|
@ -165,7 +165,7 @@ var TableTracking = []string{
|
|||
func (s *Server) addTableStats(acc telegraf.Accumulator) error {
|
||||
tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("table stats query error, %s\n", err.Error())
|
||||
return fmt.Errorf("table stats query error, %s", err.Error())
|
||||
}
|
||||
|
||||
defer tablesCursor.Close()
|
||||
|
|
@ -179,12 +179,12 @@ func (s *Server) addTableStats(acc telegraf.Accumulator) error {
|
|||
Get([]string{"table_server", table.Id, s.serverStatus.Id}).
|
||||
Run(s.session)
|
||||
if err != nil {
|
||||
return fmt.Errorf("table stats query error, %s\n", err.Error())
|
||||
return fmt.Errorf("table stats query error, %s", err.Error())
|
||||
}
|
||||
defer cursor.Close()
|
||||
var ts tableStats
|
||||
if err := cursor.One(&ts); err != nil {
|
||||
return fmt.Errorf("failure to parse table stats, %s\n", err.Error())
|
||||
return fmt.Errorf("failure to parse table stats, %s", err.Error())
|
||||
}
|
||||
|
||||
tags := s.getDefaultTags()
|
||||
|
|
|
|||
|
|
@ -300,15 +300,15 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
|
|||
if err != nil {
|
||||
s.Log.Errorf("Reading SNMPtranslate file error: %s", err.Error())
|
||||
return err
|
||||
} else {
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
oids := strings.Fields(string(line))
|
||||
if len(oids) == 2 && oids[1] != "" {
|
||||
oid_name := oids[0]
|
||||
oid := oids[1]
|
||||
fillnode(s.initNode, oid_name, strings.Split(string(oid), "."))
|
||||
s.nameToOid[oid_name] = oid
|
||||
}
|
||||
}
|
||||
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
oids := strings.Fields(line)
|
||||
if len(oids) == 2 && oids[1] != "" {
|
||||
oid_name := oids[0]
|
||||
oid := oids[1]
|
||||
fillnode(s.initNode, oid_name, strings.Split(oid, "."))
|
||||
s.nameToOid[oid_name] = oid
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ func (s *SystemPS) NetConnections() ([]net.ConnectionStat, error) {
|
|||
|
||||
func (s *SystemPS) DiskIO(names []string) (map[string]disk.IOCountersStat, error) {
|
||||
m, err := disk.IOCounters(names...)
|
||||
if err == internal.NotImplementedError {
|
||||
if err == internal.ErrorNotImplemented {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -290,17 +290,17 @@ func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.M
|
|||
// line from the file.
|
||||
if firstLine {
|
||||
return parser.Parse([]byte(line))
|
||||
} else {
|
||||
m, err := parser.ParseLine(line)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if m != nil {
|
||||
return []telegraf.Metric{m}, nil
|
||||
}
|
||||
return []telegraf.Metric{}, nil
|
||||
}
|
||||
|
||||
m, err := parser.ParseLine(line)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if m != nil {
|
||||
return []telegraf.Metric{m}, nil
|
||||
}
|
||||
return []telegraf.Metric{}, nil
|
||||
default:
|
||||
return parser.Parse([]byte(line))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -149,8 +149,7 @@ func (s *Varnish) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
sectionMap[section][field], err = strconv.ParseUint(value, 10, 64)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("Expected a numeric value for %s = %v\n",
|
||||
stat, value))
|
||||
acc.AddError(fmt.Errorf("expected a numeric value for %s = %v", stat, value))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -72,9 +72,8 @@ func (z *Zookeeper) dial(ctx context.Context, addr string) (net.Conn, error) {
|
|||
dialer.Deadline = deadline
|
||||
}
|
||||
return tls.DialWithDialer(&dialer, "tcp", addr, z.tlsConfig)
|
||||
} else {
|
||||
return dialer.DialContext(ctx, "tcp", addr)
|
||||
}
|
||||
return dialer.DialContext(ctx, "tcp", addr)
|
||||
}
|
||||
|
||||
// Gather reads stats from all configured servers accumulates stats
|
||||
|
|
@ -132,7 +131,7 @@ func (z *Zookeeper) gatherServer(ctx context.Context, address string, acc telegr
|
|||
|
||||
service := strings.Split(address, ":")
|
||||
if len(service) != 2 {
|
||||
return fmt.Errorf("Invalid service address: %s", address)
|
||||
return fmt.Errorf("invalid service address: %s", address)
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package all
|
||||
|
||||
import (
|
||||
//Blank imports for plugins to register themselves
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/application_insights"
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
|
|
@ -14,9 +13,10 @@ import (
|
|||
)
|
||||
|
||||
type Amon struct {
|
||||
ServerKey string
|
||||
AmonInstance string
|
||||
Timeout internal.Duration
|
||||
ServerKey string `toml:"server_key"`
|
||||
AmonInstance string `toml:"amon_instance"`
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
client *http.Client
|
||||
}
|
||||
|
|
@ -76,7 +76,7 @@ func (a *Amon) Write(metrics []telegraf.Metric) error {
|
|||
metricCounter++
|
||||
}
|
||||
} else {
|
||||
log.Printf("I! unable to build Metric for %s, skipping\n", m.Name())
|
||||
a.Log.Infof("Unable to build Metric for %s, skipping", m.Name())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -84,22 +84,22 @@ func (a *Amon) Write(metrics []telegraf.Metric) error {
|
|||
copy(ts.Series, tempSeries[0:])
|
||||
tsBytes, err := json.Marshal(ts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to marshal TimeSeries, %s", err.Error())
|
||||
}
|
||||
req, err := http.NewRequest("POST", a.authenticatedUrl(), bytes.NewBuffer(tsBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to create http.Request, %s", err.Error())
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := a.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
||||
return fmt.Errorf("error POSTing metrics, %s", err.Error())
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 209 {
|
||||
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
|
||||
return fmt.Errorf("received bad status code, %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package amqp
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
|
@ -55,6 +54,7 @@ type AMQP struct {
|
|||
Timeout internal.Duration `toml:"timeout"`
|
||||
UseBatchFormat bool `toml:"use_batch_format"`
|
||||
ContentEncoding string `toml:"content_encoding"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
tls.ClientConfig
|
||||
|
||||
serializer serializers.Serializer
|
||||
|
|
@ -267,7 +267,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
|
|||
}
|
||||
|
||||
if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 {
|
||||
log.Printf("D! Output [amqp] sent MaxMessages; closing connection")
|
||||
q.Log.Debug("Sent MaxMessages; closing connection")
|
||||
q.client.Close()
|
||||
q.client = nil
|
||||
}
|
||||
|
|
@ -296,22 +296,22 @@ func (q *AMQP) publish(key string, body []byte) error {
|
|||
func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) {
|
||||
if q.UseBatchFormat {
|
||||
return q.serializer.SerializeBatch(metrics)
|
||||
} else {
|
||||
var buf bytes.Buffer
|
||||
for _, metric := range metrics {
|
||||
octets, err := q.serializer.Serialize(metric)
|
||||
if err != nil {
|
||||
log.Printf("D! [outputs.amqp] Could not serialize metric: %v", err)
|
||||
continue
|
||||
}
|
||||
_, err = buf.Write(octets)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
body := buf.Bytes()
|
||||
return body, nil
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
for _, metric := range metrics {
|
||||
octets, err := q.serializer.Serialize(metric)
|
||||
if err != nil {
|
||||
q.Log.Debugf("Could not serialize metric: %v", err)
|
||||
continue
|
||||
}
|
||||
_, err = buf.Write(octets)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
body := buf.Bytes()
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package application_insights
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
|
@ -23,22 +22,17 @@ type DiagnosticsMessageSubscriber interface {
|
|||
}
|
||||
|
||||
type ApplicationInsights struct {
|
||||
InstrumentationKey string
|
||||
EndpointURL string
|
||||
Timeout internal.Duration
|
||||
EnableDiagnosticLogging bool
|
||||
ContextTagSources map[string]string
|
||||
diagMsgSubscriber DiagnosticsMessageSubscriber
|
||||
transmitter TelemetryTransmitter
|
||||
diagMsgListener appinsights.DiagnosticsMessageListener
|
||||
}
|
||||
InstrumentationKey string `toml:"instrumentation_key"`
|
||||
EndpointURL string `toml:"endpoint_url"`
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
EnableDiagnosticLogging bool `toml:"enable_diagnostic_logging"`
|
||||
ContextTagSources map[string]string `toml:"context_tag_sources"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
const (
|
||||
Error = "E! "
|
||||
Warning = "W! "
|
||||
Info = "I! "
|
||||
Debug = "D! "
|
||||
)
|
||||
diagMsgSubscriber DiagnosticsMessageSubscriber
|
||||
transmitter TelemetryTransmitter
|
||||
diagMsgListener appinsights.DiagnosticsMessageListener
|
||||
}
|
||||
|
||||
var (
|
||||
sampleConfig = `
|
||||
|
|
@ -76,7 +70,7 @@ func (a *ApplicationInsights) Description() string {
|
|||
|
||||
func (a *ApplicationInsights) Connect() error {
|
||||
if a.InstrumentationKey == "" {
|
||||
return fmt.Errorf("Instrumentation key is required")
|
||||
return fmt.Errorf("instrumentation key is required")
|
||||
}
|
||||
|
||||
if a.transmitter == nil {
|
||||
|
|
@ -85,7 +79,7 @@ func (a *ApplicationInsights) Connect() error {
|
|||
|
||||
if a.EnableDiagnosticLogging && a.diagMsgSubscriber != nil {
|
||||
a.diagMsgListener = a.diagMsgSubscriber.Subscribe(func(msg string) error {
|
||||
logOutputMsg(Info, "%s", msg)
|
||||
a.Log.Info(msg)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
@ -117,9 +111,9 @@ func (a *ApplicationInsights) Close() error {
|
|||
|
||||
select {
|
||||
case <-a.transmitter.Close():
|
||||
logOutputMsg(Info, "Closed")
|
||||
a.Log.Info("Closed")
|
||||
case <-time.After(a.Timeout.Duration):
|
||||
logOutputMsg(Warning, "Close operation timed out after %v", a.Timeout.Duration)
|
||||
a.Log.Warnf("Close operation timed out after %v", a.Timeout.Duration)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -139,15 +133,12 @@ func (a *ApplicationInsights) createTelemetry(metric telegraf.Metric) []appinsig
|
|||
telemetry := a.createSimpleMetricTelemetry(metric, "value", false)
|
||||
if telemetry != nil {
|
||||
return []appinsights.Telemetry{telemetry}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// AppInsights does not support multi-dimensional metrics at the moment, so we need to disambiguate resulting telemetry
|
||||
// by adding field name as the telemetry name suffix
|
||||
retval := a.createTelemetryForUnusedFields(metric, nil)
|
||||
return retval
|
||||
return nil
|
||||
}
|
||||
// AppInsights does not support multi-dimensional metrics at the moment, so we need to disambiguate resulting telemetry
|
||||
// by adding field name as the telemetry name suffix
|
||||
return a.createTelemetryForUnusedFields(metric, nil)
|
||||
}
|
||||
|
||||
func (a *ApplicationInsights) createSimpleMetricTelemetry(metric telegraf.Metric, fieldName string, useFieldNameInTelemetryName bool) *appinsights.MetricTelemetry {
|
||||
|
|
@ -251,7 +242,7 @@ func getFloat64TelemetryPropertyValue(
|
|||
return metricValue, nil
|
||||
}
|
||||
|
||||
return 0.0, fmt.Errorf("No field from the candidate list was found in the metric")
|
||||
return 0.0, fmt.Errorf("no field from the candidate list was found in the metric")
|
||||
}
|
||||
|
||||
func getIntTelemetryPropertyValue(
|
||||
|
|
@ -277,7 +268,7 @@ func getIntTelemetryPropertyValue(
|
|||
return metricValue, nil
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("No field from the candidate list was found in the metric")
|
||||
return 0, fmt.Errorf("no field from the candidate list was found in the metric")
|
||||
}
|
||||
|
||||
func contains(set []string, val string) bool {
|
||||
|
|
@ -320,11 +311,11 @@ func toInt(value interface{}) (int, error) {
|
|||
case uint64:
|
||||
if is32Bit {
|
||||
if v > math.MaxInt32 {
|
||||
return 0, fmt.Errorf("Value [%d] out of range of 32-bit integers", v)
|
||||
return 0, fmt.Errorf("value [%d] out of range of 32-bit integers", v)
|
||||
}
|
||||
} else {
|
||||
if v > math.MaxInt64 {
|
||||
return 0, fmt.Errorf("Value [%d] out of range of 64-bit integers", v)
|
||||
return 0, fmt.Errorf("value [%d] out of range of 64-bit integers", v)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -333,7 +324,7 @@ func toInt(value interface{}) (int, error) {
|
|||
case int64:
|
||||
if is32Bit {
|
||||
if v > math.MaxInt32 || v < math.MinInt32 {
|
||||
return 0, fmt.Errorf("Value [%d] out of range of 32-bit integers", v)
|
||||
return 0, fmt.Errorf("value [%d] out of range of 32-bit integers", v)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -343,10 +334,6 @@ func toInt(value interface{}) (int, error) {
|
|||
return 0.0, fmt.Errorf("[%s] cannot be converted to an int value", value)
|
||||
}
|
||||
|
||||
func logOutputMsg(level string, format string, v ...interface{}) {
|
||||
log.Printf(level+"[outputs.application_insights] "+format, v...)
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("application_insights", func() telegraf.Output {
|
||||
return &ApplicationInsights{
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package application_insights
|
||||
|
||||
import (
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -25,6 +26,7 @@ func TestConnectFailsIfNoIkey(t *testing.T) {
|
|||
transmitter: transmitter,
|
||||
// Very long timeout to ensure we do not rely on timeouts for closing the transmitter
|
||||
Timeout: internal.Duration{Duration: time.Hour},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err := ai.Connect()
|
||||
|
|
@ -40,6 +42,7 @@ func TestOutputCloseTimesOut(t *testing.T) {
|
|||
ai := ApplicationInsights{
|
||||
transmitter: transmitter,
|
||||
Timeout: internal.Duration{Duration: time.Millisecond * 50},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err := ai.Close()
|
||||
|
|
@ -67,6 +70,7 @@ func TestCloseRemovesDiagMsgListener(t *testing.T) {
|
|||
EnableDiagnosticLogging: true,
|
||||
diagMsgSubscriber: diagMsgSubscriber,
|
||||
InstrumentationKey: "1234", // Fake, but necessary to enable tracking
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err := ai.Connect()
|
||||
|
|
@ -150,6 +154,7 @@ func TestAggregateMetricCreated(t *testing.T) {
|
|||
ai := ApplicationInsights{
|
||||
transmitter: transmitter,
|
||||
InstrumentationKey: "1234", // Fake, but necessary to enable tracking
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = ai.Connect()
|
||||
|
|
@ -208,6 +213,7 @@ func TestSimpleMetricCreated(t *testing.T) {
|
|||
ai := ApplicationInsights{
|
||||
transmitter: transmitter,
|
||||
InstrumentationKey: "1234", // Fake, but necessary to enable tracking
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = ai.Connect()
|
||||
|
|
@ -278,6 +284,7 @@ func TestTagsAppliedToTelemetry(t *testing.T) {
|
|||
ai := ApplicationInsights{
|
||||
transmitter: transmitter,
|
||||
InstrumentationKey: "1234", // Fake, but necessary to enable tracking
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = ai.Connect()
|
||||
|
|
@ -319,6 +326,7 @@ func TestContextTagsSetOnSimpleTelemetry(t *testing.T) {
|
|||
"ai.cloud.roleInstance": "kubernetes_pod_name",
|
||||
"ai.user.id": "nonexistent",
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = ai.Connect()
|
||||
|
|
@ -356,6 +364,7 @@ func TestContextTagsSetOnAggregateTelemetry(t *testing.T) {
|
|||
"ai.cloud.roleInstance": "kubernetes_pod_name",
|
||||
"ai.user.id": "nonexistent",
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
err = ai.Connect()
|
||||
|
|
|
|||
|
|
@ -11,11 +11,11 @@ type Transmitter struct {
|
|||
func NewTransmitter(ikey string, endpointURL string) *Transmitter {
|
||||
if len(endpointURL) == 0 {
|
||||
return &Transmitter{client: appinsights.NewTelemetryClient(ikey)}
|
||||
} else {
|
||||
telemetryConfig := appinsights.NewTelemetryConfiguration(ikey)
|
||||
telemetryConfig.EndpointUrl = endpointURL
|
||||
return &Transmitter{client: appinsights.NewTelemetryClientFromConfig(telemetryConfig)}
|
||||
}
|
||||
|
||||
telemetryConfig := appinsights.NewTelemetryConfiguration(ikey)
|
||||
telemetryConfig.EndpointUrl = endpointURL
|
||||
return &Transmitter{client: appinsights.NewTelemetryClientFromConfig(telemetryConfig)}
|
||||
}
|
||||
|
||||
func (t *Transmitter) Track(telemetry appinsights.Telemetry) {
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import (
|
|||
"fmt"
|
||||
"hash/fnv"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
|
@ -27,11 +26,12 @@ import (
|
|||
// service
|
||||
type AzureMonitor struct {
|
||||
Timeout internal.Duration
|
||||
NamespacePrefix string `toml:"namespace_prefix"`
|
||||
StringsAsDimensions bool `toml:"strings_as_dimensions"`
|
||||
Region string
|
||||
ResourceID string `toml:"resource_id"`
|
||||
EndpointUrl string `toml:"endpoint_url"`
|
||||
NamespacePrefix string `toml:"namespace_prefix"`
|
||||
StringsAsDimensions bool `toml:"strings_as_dimensions"`
|
||||
Region string `toml:"region"`
|
||||
ResourceID string `toml:"resource_id"`
|
||||
EndpointUrl string `toml:"endpoint_url"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
url string
|
||||
auth autorest.Authorizer
|
||||
|
|
@ -62,14 +62,14 @@ func (m *virtualMachineMetadata) ResourceID() string {
|
|||
m.Compute.ResourceGroupName,
|
||||
m.Compute.VMScaleSetName,
|
||||
)
|
||||
} else {
|
||||
return fmt.Sprintf(
|
||||
resourceIDTemplate,
|
||||
m.Compute.SubscriptionID,
|
||||
m.Compute.ResourceGroupName,
|
||||
m.Compute.Name,
|
||||
)
|
||||
}
|
||||
|
||||
return fmt.Sprintf(
|
||||
resourceIDTemplate,
|
||||
m.Compute.SubscriptionID,
|
||||
m.Compute.ResourceGroupName,
|
||||
m.Compute.Name,
|
||||
)
|
||||
}
|
||||
|
||||
type dimension struct {
|
||||
|
|
@ -189,7 +189,7 @@ func (a *AzureMonitor) Connect() error {
|
|||
a.url = fmt.Sprintf(urlOverrideTemplate, endpointUrl, resourceID)
|
||||
}
|
||||
|
||||
log.Printf("D! Writing to Azure Monitor URL: %s", a.url)
|
||||
a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)
|
||||
|
||||
a.auth, err = auth.NewAuthorizerFromEnvironmentWithResource(defaultAuthResource)
|
||||
if err != nil {
|
||||
|
|
@ -279,14 +279,14 @@ func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
|
|||
if azm, ok := azmetrics[id]; !ok {
|
||||
amm, err := translate(m, a.NamespacePrefix)
|
||||
if err != nil {
|
||||
log.Printf("E! [outputs.azure_monitor]: could not create azure metric for %q; discarding point", m.Name())
|
||||
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
|
||||
continue
|
||||
}
|
||||
azmetrics[id] = amm
|
||||
} else {
|
||||
amm, err := translate(m, a.NamespacePrefix)
|
||||
if err != nil {
|
||||
log.Printf("E! [outputs.azure_monitor]: could not create azure metric for %q; discarding point", m.Name())
|
||||
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -611,7 +611,7 @@ func (a *AzureMonitor) Push() []telegraf.Metric {
|
|||
)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("E! [outputs.azure_monitor]: could not create metric for aggregation %q; discarding point", agg.name)
|
||||
a.Log.Errorf("Could not create metric for aggregation %q; discarding point", agg.name)
|
||||
}
|
||||
|
||||
metrics = append(metrics, m)
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ func TestAggregate(t *testing.T) {
|
|||
plugin: &AzureMonitor{
|
||||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
|
|
@ -52,6 +53,7 @@ func TestAggregate(t *testing.T) {
|
|||
plugin: &AzureMonitor{
|
||||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
|
|
@ -75,6 +77,7 @@ func TestAggregate(t *testing.T) {
|
|||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
StringsAsDimensions: true,
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
|
|
@ -116,6 +119,7 @@ func TestAggregate(t *testing.T) {
|
|||
plugin: &AzureMonitor{
|
||||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
Log: testutil.Logger{},
|
||||
cache: make(map[time.Time]map[uint64]*aggregate, 36),
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
|
|
@ -153,6 +157,7 @@ func TestAggregate(t *testing.T) {
|
|||
plugin: &AzureMonitor{
|
||||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
Log: testutil.Logger{},
|
||||
cache: make(map[time.Time]map[uint64]*aggregate, 36),
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
|
|
@ -262,6 +267,7 @@ func TestWrite(t *testing.T) {
|
|||
plugin: &AzureMonitor{
|
||||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
|
|
@ -282,6 +288,7 @@ func TestWrite(t *testing.T) {
|
|||
plugin: &AzureMonitor{
|
||||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
|
|
@ -308,6 +315,7 @@ func TestWrite(t *testing.T) {
|
|||
plugin: &AzureMonitor{
|
||||
Region: "test",
|
||||
ResourceID: "/test",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"cloud.google.com/go/pubsub"
|
||||
|
|
@ -79,6 +78,8 @@ type PubSub struct {
|
|||
PublishTimeout internal.Duration `toml:"publish_timeout"`
|
||||
Base64Data bool `toml:"base64_data"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
t topic
|
||||
c *pubsub.Client
|
||||
|
||||
|
|
@ -111,9 +112,8 @@ func (ps *PubSub) Connect() error {
|
|||
|
||||
if ps.stubTopic == nil {
|
||||
return ps.initPubSubClient()
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *PubSub) Close() error {
|
||||
|
|
@ -230,7 +230,7 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
|
|||
for i, m := range metrics {
|
||||
b, err := ps.serializer.Serialize(m)
|
||||
if err != nil {
|
||||
log.Printf("D! [outputs.cloud_pubsub] Could not serialize metric: %v", err)
|
||||
ps.Log.Debugf("Could not serialize metric: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
_ "github.com/jackc/pgx/stdlib"
|
||||
_ "github.com/jackc/pgx/stdlib" //to register stdlib from PostgreSQL Driver and Toolkit
|
||||
)
|
||||
|
||||
const MaxInt64 = int64(^uint64(0) >> 1)
|
||||
|
|
@ -126,9 +126,8 @@ func escapeValue(val interface{}) (string, error) {
|
|||
// possible value.
|
||||
if t <= uint64(MaxInt64) {
|
||||
return strconv.FormatInt(int64(t), 10), nil
|
||||
} else {
|
||||
return strconv.FormatInt(MaxInt64, 10), nil
|
||||
}
|
||||
return strconv.FormatInt(MaxInt64, 10), nil
|
||||
case bool:
|
||||
return strconv.FormatBool(t), nil
|
||||
case time.Time:
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
|
@ -16,10 +15,11 @@ import (
|
|||
)
|
||||
|
||||
type Datadog struct {
|
||||
Apikey string
|
||||
Timeout internal.Duration
|
||||
Apikey string `toml:"apikey"`
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
URL string `toml:"url"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
URL string `toml:"url"`
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
|
|
@ -96,7 +96,7 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
|
|||
metricCounter++
|
||||
}
|
||||
} else {
|
||||
log.Printf("I! unable to build Metric for %s due to error '%v', skipping\n", m.Name(), err)
|
||||
d.Log.Infof("Unable to build Metric for %s due to error '%v', skipping", m.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -109,22 +109,22 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
|
|||
copy(ts.Series, tempSeries[0:])
|
||||
tsBytes, err := json.Marshal(ts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to marshal TimeSeries, %s", err.Error())
|
||||
}
|
||||
req, err := http.NewRequest("POST", d.authenticatedUrl(), bytes.NewBuffer(tsBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create http.Request, %s\n", strings.Replace(err.Error(), d.Apikey, redactedApiKey, -1))
|
||||
return fmt.Errorf("unable to create http.Request, %s", strings.Replace(err.Error(), d.Apikey, redactedApiKey, -1))
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
resp, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error POSTing metrics, %s\n", strings.Replace(err.Error(), d.Apikey, redactedApiKey, -1))
|
||||
return fmt.Errorf("error POSTing metrics, %s", strings.Replace(err.Error(), d.Apikey, redactedApiKey, -1))
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 209 {
|
||||
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
|
||||
return fmt.Errorf("received bad status code, %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ func TestBadStatusCode(t *testing.T) {
|
|||
if err == nil {
|
||||
t.Errorf("error expected but none returned")
|
||||
} else {
|
||||
require.EqualError(t, fmt.Errorf("received bad status code, 500\n"), err.Error())
|
||||
require.EqualError(t, fmt.Errorf("received bad status code, 500"), err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.R
|
|||
s := stderr
|
||||
|
||||
if err != nil {
|
||||
if err == internal.TimeoutErr {
|
||||
if err == internal.ErrTimeout {
|
||||
return fmt.Errorf("%q timed out and was killed", command)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"crypto/tls"
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"time"
|
||||
|
|
@ -16,15 +15,17 @@ import (
|
|||
)
|
||||
|
||||
type Graphite struct {
|
||||
GraphiteTagSupport bool
|
||||
GraphiteSeparator string
|
||||
GraphiteTagSupport bool `toml:"graphite_tag_support"`
|
||||
GraphiteSeparator string `toml:"graphite_separator"`
|
||||
// URL is only for backwards compatibility
|
||||
Servers []string
|
||||
Prefix string
|
||||
Template string
|
||||
Templates []string
|
||||
Timeout int
|
||||
conns []net.Conn
|
||||
Servers []string `toml:"servers"`
|
||||
Prefix string `toml:"prefix"`
|
||||
Template string `toml:"template"`
|
||||
Templates []string `toml:"templates"`
|
||||
Timeout int `toml:"timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
conns []net.Conn
|
||||
tlsint.ClientConfig
|
||||
}
|
||||
|
||||
|
|
@ -124,22 +125,22 @@ func (g *Graphite) Description() string {
|
|||
// We can detect that by finding an eof
|
||||
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
|
||||
// props to Tv via the authors of carbon-relay-ng` for this trick.
|
||||
func checkEOF(conn net.Conn) {
|
||||
func (g *Graphite) checkEOF(conn net.Conn) {
|
||||
b := make([]byte, 1024)
|
||||
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
|
||||
num, err := conn.Read(b)
|
||||
if err == io.EOF {
|
||||
log.Printf("E! Conn %s is closed. closing conn explicitly", conn)
|
||||
g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
// just in case i misunderstand something or the remote behaves badly
|
||||
if num != 0 {
|
||||
log.Printf("I! conn %s .conn.Read data? did not expect that. data: %s\n", conn, b[:num])
|
||||
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
|
||||
}
|
||||
// Log non-timeout errors or close.
|
||||
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
|
||||
log.Printf("E! conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s\n", conn, err)
|
||||
g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
|
@ -157,7 +158,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
|||
for _, metric := range metrics {
|
||||
buf, err := s.Serialize(metric)
|
||||
if err != nil {
|
||||
log.Printf("E! Error serializing some metrics to graphite: %s", err.Error())
|
||||
g.Log.Errorf("Error serializing some metrics to graphite: %s", err.Error())
|
||||
}
|
||||
batch = append(batch, buf...)
|
||||
}
|
||||
|
|
@ -166,7 +167,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
// try to reconnect and retry to send
|
||||
if err != nil {
|
||||
log.Println("E! Graphite: Reconnecting and retrying: ")
|
||||
g.Log.Error("Graphite: Reconnecting and retrying...")
|
||||
g.Connect()
|
||||
err = g.send(batch)
|
||||
}
|
||||
|
|
@ -176,7 +177,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
func (g *Graphite) send(batch []byte) error {
|
||||
// This will get set to nil if a successful write occurs
|
||||
err := errors.New("Could not write to any Graphite server in cluster\n")
|
||||
err := errors.New("could not write to any Graphite server in cluster")
|
||||
|
||||
// Send data to a random server
|
||||
p := rand.Perm(len(g.conns))
|
||||
|
|
@ -184,10 +185,10 @@ func (g *Graphite) send(batch []byte) error {
|
|||
if g.Timeout > 0 {
|
||||
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
|
||||
}
|
||||
checkEOF(g.conns[n])
|
||||
g.checkEOF(g.conns[n])
|
||||
if _, e := g.conns[n].Write(batch); e != nil {
|
||||
// Error
|
||||
log.Println("E! Graphite Error: " + e.Error())
|
||||
g.Log.Errorf("Graphite Error: " + e.Error())
|
||||
// Close explicitly
|
||||
g.conns[n].Close()
|
||||
// Let's try the next one
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package graphite
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"net"
|
||||
"net/textproto"
|
||||
"sync"
|
||||
|
|
@ -20,6 +21,7 @@ func TestGraphiteError(t *testing.T) {
|
|||
g := Graphite{
|
||||
Servers: []string{"127.0.0.1:12004", "127.0.0.1:12003"},
|
||||
Prefix: "my.prefix",
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
// Init metrics
|
||||
m1, _ := metric.New(
|
||||
|
|
@ -36,7 +38,7 @@ func TestGraphiteError(t *testing.T) {
|
|||
require.NoError(t, err1)
|
||||
err2 := g.Write(metrics)
|
||||
require.Error(t, err2)
|
||||
assert.Equal(t, "Could not write to any Graphite server in cluster\n", err2.Error())
|
||||
assert.Equal(t, "could not write to any Graphite server in cluster", err2.Error())
|
||||
}
|
||||
|
||||
func TestGraphiteOK(t *testing.T) {
|
||||
|
|
@ -50,6 +52,7 @@ func TestGraphiteOK(t *testing.T) {
|
|||
g := Graphite{
|
||||
Prefix: "my.prefix",
|
||||
Servers: []string{"localhost:12003"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
|
|
@ -111,6 +114,7 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
|
|||
Prefix: "my.prefix",
|
||||
GraphiteSeparator: ".",
|
||||
Servers: []string{"localhost:12003"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
|
|
@ -172,6 +176,7 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
|
|||
Prefix: "my.prefix",
|
||||
GraphiteSeparator: "_",
|
||||
Servers: []string{"localhost:12003"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
|
|
@ -237,6 +242,7 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
|
|||
"measurement.tags.host.field",
|
||||
},
|
||||
Servers: []string{"localhost:12003"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
|
|
@ -298,6 +304,7 @@ func TestGraphiteOkWithTags(t *testing.T) {
|
|||
Prefix: "my.prefix",
|
||||
GraphiteTagSupport: true,
|
||||
Servers: []string{"localhost:12003"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
|
|
@ -360,6 +367,7 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
|
|||
GraphiteTagSupport: true,
|
||||
GraphiteSeparator: ".",
|
||||
Servers: []string{"localhost:12003"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
|
|
@ -422,6 +430,7 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
|
|||
GraphiteTagSupport: true,
|
||||
GraphiteSeparator: "_",
|
||||
Servers: []string{"localhost:12003"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// Init metrics
|
||||
|
|
|
|||
|
|
@ -68,9 +68,8 @@ func asFloat(fv interface{}) (float64, bool) {
|
|||
case bool:
|
||||
if v {
|
||||
return 1.0, true
|
||||
} else {
|
||||
return 0.0, true
|
||||
}
|
||||
return 0.0, true
|
||||
default:
|
||||
return 0.0, false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
|
@ -75,8 +74,9 @@ type Health struct {
|
|||
BasicPassword string `toml:"basic_password"`
|
||||
tlsint.ServerConfig
|
||||
|
||||
Compares []*Compares `toml:"compares"`
|
||||
Contains []*Contains `toml:"contains"`
|
||||
Compares []*Compares `toml:"compares"`
|
||||
Contains []*Contains `toml:"contains"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
checkers []Checker
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
|
@ -153,14 +153,14 @@ func (h *Health) Connect() error {
|
|||
|
||||
h.origin = h.getOrigin(listener)
|
||||
|
||||
log.Printf("I! [outputs.health] Listening on %s", h.origin)
|
||||
h.Log.Infof("Listening on %s", h.origin)
|
||||
|
||||
h.wg.Add(1)
|
||||
go func() {
|
||||
defer h.wg.Done()
|
||||
err := h.server.Serve(listener)
|
||||
if err != http.ErrServerClosed {
|
||||
log.Printf("E! [outputs.health] Serve error on %s: %v", h.origin, err)
|
||||
h.Log.Errorf("Serve error on %s: %v", h.origin, err)
|
||||
}
|
||||
h.origin = ""
|
||||
}()
|
||||
|
|
@ -174,9 +174,8 @@ func onAuthError(_ http.ResponseWriter) {
|
|||
func (h *Health) listen() (net.Listener, error) {
|
||||
if h.tlsConf != nil {
|
||||
return tls.Listen(h.network, h.address, h.tlsConf)
|
||||
} else {
|
||||
return net.Listen(h.network, h.address)
|
||||
}
|
||||
return net.Listen(h.network, h.address)
|
||||
}
|
||||
|
||||
func (h *Health) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
|
|
|
|||
|
|
@ -106,6 +106,7 @@ func TestHealth(t *testing.T) {
|
|||
output.ServiceAddress = "tcp://127.0.0.1:0"
|
||||
output.Compares = tt.options.Compares
|
||||
output.Contains = tt.options.Contains
|
||||
output.Log = testutil.Logger{}
|
||||
|
||||
err := output.Init()
|
||||
require.NoError(t, err)
|
||||
|
|
@ -140,6 +141,7 @@ func TestInitServiceAddress(t *testing.T) {
|
|||
name: "port without scheme is not allowed",
|
||||
plugin: &health.Health{
|
||||
ServiceAddress: ":8080",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
err: true,
|
||||
},
|
||||
|
|
@ -147,6 +149,7 @@ func TestInitServiceAddress(t *testing.T) {
|
|||
name: "path without scheme is not allowed",
|
||||
plugin: &health.Health{
|
||||
ServiceAddress: "/tmp/telegraf",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
err: true,
|
||||
},
|
||||
|
|
@ -154,6 +157,7 @@ func TestInitServiceAddress(t *testing.T) {
|
|||
name: "tcp with port maps to http",
|
||||
plugin: &health.Health{
|
||||
ServiceAddress: "tcp://:8080",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -161,30 +165,35 @@ func TestInitServiceAddress(t *testing.T) {
|
|||
plugin: &health.Health{
|
||||
ServiceAddress: "tcp://:8080",
|
||||
ServerConfig: *pki.TLSServerConfig(),
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "tcp4 is allowed",
|
||||
plugin: &health.Health{
|
||||
ServiceAddress: "tcp4://:8080",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "tcp6 is allowed",
|
||||
plugin: &health.Health{
|
||||
ServiceAddress: "tcp6://:8080",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "http scheme",
|
||||
plugin: &health.Health{
|
||||
ServiceAddress: "http://:8080",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "https scheme",
|
||||
plugin: &health.Health{
|
||||
ServiceAddress: "https://:8080",
|
||||
Log: testutil.Logger{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -192,6 +201,7 @@ func TestInitServiceAddress(t *testing.T) {
|
|||
t.Run(tt.name, func(t *testing.T) {
|
||||
output := health.NewHealth()
|
||||
output.ServiceAddress = tt.plugin.ServiceAddress
|
||||
output.Log = testutil.Logger{}
|
||||
|
||||
err := output.Init()
|
||||
if tt.err {
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
|
@ -31,6 +30,8 @@ type Instrumental struct {
|
|||
Timeout internal.Duration
|
||||
Debug bool
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
|
|
@ -82,7 +83,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
|||
if i.conn == nil {
|
||||
err := i.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("FAILED to (re)connect to Instrumental. Error: %s\n", err)
|
||||
return fmt.Errorf("failed to (re)connect to Instrumental. Error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -111,7 +112,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
buf, err := s.Serialize(m)
|
||||
if err != nil {
|
||||
log.Printf("D! [outputs.instrumental] Could not serialize metric: %v", err)
|
||||
i.Log.Debugf("Could not serialize metric: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -187,7 +188,7 @@ func (i *Instrumental) authenticate(conn net.Conn) error {
|
|||
}
|
||||
|
||||
if string(responses)[:6] != "ok\nok\n" {
|
||||
return fmt.Errorf("Authentication failed: %s", responses)
|
||||
return fmt.Errorf("authentication failed: %s", responses)
|
||||
}
|
||||
|
||||
i.conn = conn
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"regexp"
|
||||
|
||||
|
|
@ -17,12 +16,13 @@ import (
|
|||
|
||||
// Librato structure for configuration and client
|
||||
type Librato struct {
|
||||
APIUser string `toml:"api_user"`
|
||||
APIToken string `toml:"api_token"`
|
||||
Debug bool
|
||||
SourceTag string // Deprecated, keeping for backward-compatibility
|
||||
Timeout internal.Duration
|
||||
Template string
|
||||
APIUser string `toml:"api_user"`
|
||||
APIToken string `toml:"api_token"`
|
||||
Debug bool `toml:"debug"`
|
||||
SourceTag string `toml:"source_tag"` // Deprecated, keeping for backward-compatibility
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
Template string `toml:"template"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
APIUrl string
|
||||
client *http.Client
|
||||
|
|
@ -89,7 +89,6 @@ func (l *Librato) Connect() error {
|
|||
}
|
||||
|
||||
func (l *Librato) Write(metrics []telegraf.Metric) error {
|
||||
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -106,11 +105,11 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
|||
if gauges, err := l.buildGauges(m); err == nil {
|
||||
for _, gauge := range gauges {
|
||||
tempGauges = append(tempGauges, gauge)
|
||||
log.Printf("D! Got a gauge: %v\n", gauge)
|
||||
l.Log.Debugf("Got a gauge: %v", gauge)
|
||||
}
|
||||
} else {
|
||||
log.Printf("I! unable to build Gauge for %s, skipping\n", m.Name())
|
||||
log.Printf("D! Couldn't build gauge: %v\n", err)
|
||||
l.Log.Infof("Unable to build Gauge for %s, skipping", m.Name())
|
||||
l.Log.Debugf("Couldn't build gauge: %v", err)
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -129,34 +128,32 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
|||
copy(lmetrics.Gauges, tempGauges[start:end])
|
||||
metricsBytes, err := json.Marshal(lmetrics)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to marshal Metrics, %s", err.Error())
|
||||
}
|
||||
|
||||
log.Printf("D! Librato request: %v\n", string(metricsBytes))
|
||||
l.Log.Debugf("Librato request: %v", string(metricsBytes))
|
||||
|
||||
req, err := http.NewRequest(
|
||||
"POST",
|
||||
l.APIUrl,
|
||||
bytes.NewBuffer(metricsBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"unable to create http.Request, %s\n",
|
||||
err.Error())
|
||||
return fmt.Errorf("unable to create http.Request, %s", err.Error())
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
req.SetBasicAuth(l.APIUser, l.APIToken)
|
||||
|
||||
resp, err := l.client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("D! Error POSTing metrics: %v\n", err.Error())
|
||||
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
||||
l.Log.Debugf("Error POSTing metrics: %v", err.Error())
|
||||
return fmt.Errorf("error POSTing metrics, %s", err.Error())
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 || l.Debug {
|
||||
htmlData, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("D! Couldn't get response! (%v)\n", err)
|
||||
l.Log.Debugf("Couldn't get response! (%v)", err)
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
return fmt.Errorf(
|
||||
|
|
@ -164,7 +161,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
|||
resp.StatusCode,
|
||||
string(htmlData))
|
||||
}
|
||||
log.Printf("D! Librato response: %v\n", string(htmlData))
|
||||
l.Log.Debugf("Librato response: %v", string(htmlData))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -183,7 +180,6 @@ func (l *Librato) Description() string {
|
|||
}
|
||||
|
||||
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
||||
|
||||
gauges := []*Gauge{}
|
||||
if m.Time().Unix() == 0 {
|
||||
return gauges, fmt.Errorf("time was zero %s", m.Name())
|
||||
|
|
@ -193,8 +189,7 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
|||
"value")
|
||||
if metricSource == "" {
|
||||
return gauges,
|
||||
fmt.Errorf("undeterminable Source type from Field, %s\n",
|
||||
l.Template)
|
||||
fmt.Errorf("undeterminable Source type from Field, %s", l.Template)
|
||||
}
|
||||
for fieldName, value := range m.Fields() {
|
||||
|
||||
|
|
@ -212,14 +207,12 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
|||
continue
|
||||
}
|
||||
if err := gauge.setValue(value); err != nil {
|
||||
return gauges, fmt.Errorf(
|
||||
"unable to extract value from Fields, %s\n",
|
||||
err.Error())
|
||||
return gauges, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||
}
|
||||
gauges = append(gauges, gauge)
|
||||
}
|
||||
|
||||
log.Printf("D! Built gauges: %v\n", gauges)
|
||||
l.Log.Debugf("Built gauges: %v", gauges)
|
||||
return gauges, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,19 +10,17 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
fakeURL = "http://test.librato.com"
|
||||
fakeUser = "telegraf@influxdb.com"
|
||||
fakeToken = "123456"
|
||||
fakeURL = "http://test.librato.com"
|
||||
)
|
||||
|
||||
func fakeLibrato() *Librato {
|
||||
l := NewLibrato(fakeURL)
|
||||
l.APIUser = fakeUser
|
||||
l.APIToken = fakeToken
|
||||
func newTestLibrato(testURL string) *Librato {
|
||||
l := NewLibrato(testURL)
|
||||
l.Log = testutil.Logger{}
|
||||
return l
|
||||
}
|
||||
|
||||
|
|
@ -34,7 +32,7 @@ func TestUriOverride(t *testing.T) {
|
|||
}))
|
||||
defer ts.Close()
|
||||
|
||||
l := NewLibrato(ts.URL)
|
||||
l := newTestLibrato(ts.URL)
|
||||
l.APIUser = "telegraf@influxdb.com"
|
||||
l.APIToken = "123456"
|
||||
err := l.Connect()
|
||||
|
|
@ -50,7 +48,7 @@ func TestBadStatusCode(t *testing.T) {
|
|||
}))
|
||||
defer ts.Close()
|
||||
|
||||
l := NewLibrato(ts.URL)
|
||||
l := newTestLibrato(ts.URL)
|
||||
l.APIUser = "telegraf@influxdb.com"
|
||||
l.APIToken = "123456"
|
||||
err := l.Connect()
|
||||
|
|
@ -140,7 +138,7 @@ func TestBuildGauge(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
l := NewLibrato(fakeURL)
|
||||
l := newTestLibrato(fakeURL)
|
||||
for _, gt := range gaugeTests {
|
||||
gauges, err := l.buildGauges(gt.ptIn)
|
||||
if err != nil && gt.err == nil {
|
||||
|
|
@ -257,7 +255,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
l := NewLibrato(fakeURL)
|
||||
l := newTestLibrato(fakeURL)
|
||||
for _, gt := range gaugeTests {
|
||||
l.Template = gt.template
|
||||
gauges, err := l.buildGauges(gt.ptIn)
|
||||
|
|
|
|||
|
|
@ -112,18 +112,18 @@ func (l *Logzio) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
serialized, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to marshal metric, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to marshal metric, %s", err.Error())
|
||||
}
|
||||
|
||||
_, err = gz.Write(append(serialized, '\n'))
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to write gzip meric, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to write gzip meric, %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
err := gz.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to close gzip, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to close gzip, %s", err.Error())
|
||||
}
|
||||
|
||||
return l.send(buff.Bytes())
|
||||
|
|
@ -132,19 +132,19 @@ func (l *Logzio) Write(metrics []telegraf.Metric) error {
|
|||
func (l *Logzio) send(metrics []byte) error {
|
||||
req, err := http.NewRequest("POST", l.authUrl(), bytes.NewBuffer(metrics))
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
|
||||
return fmt.Errorf("unable to create http.Request, %s", err.Error())
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
|
||||
resp, err := l.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
|
||||
return fmt.Errorf("error POSTing metrics, %s", err.Error())
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode > 209 {
|
||||
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
|
||||
return fmt.Errorf("received bad status code, %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package opentsdb
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"net/url"
|
||||
|
|
@ -28,17 +27,19 @@ var (
|
|||
)
|
||||
|
||||
type OpenTSDB struct {
|
||||
Prefix string
|
||||
Prefix string `toml:"prefix"`
|
||||
|
||||
Host string
|
||||
Port int
|
||||
Host string `toml:"host"`
|
||||
Port int `toml:"port"`
|
||||
|
||||
HttpBatchSize int // deprecated httpBatchSize form in 1.8
|
||||
HttpPath string
|
||||
HttpBatchSize int `toml:"http_batch_size"` // deprecated httpBatchSize form in 1.8
|
||||
HttpPath string `toml:"http_path"`
|
||||
|
||||
Debug bool
|
||||
Debug bool `toml:"debug"`
|
||||
|
||||
Separator string
|
||||
Separator string `toml:"separator"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
|
|
@ -86,7 +87,7 @@ func (o *OpenTSDB) Connect() error {
|
|||
// Test Connection to OpenTSDB Server
|
||||
u, err := url.Parse(o.Host)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error in parsing host url: %s", err.Error())
|
||||
return fmt.Errorf("error in parsing host url: %s", err.Error())
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
|
||||
|
|
@ -109,7 +110,7 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
u, err := url.Parse(o.Host)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error in parsing host url: %s", err.Error())
|
||||
return fmt.Errorf("error in parsing host url: %s", err.Error())
|
||||
}
|
||||
|
||||
if u.Scheme == "" || u.Scheme == "tcp" {
|
||||
|
|
@ -117,7 +118,7 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error {
|
|||
} else if u.Scheme == "http" || u.Scheme == "https" {
|
||||
return o.WriteHttp(metrics, u)
|
||||
} else {
|
||||
return fmt.Errorf("Unknown scheme in host parameter.")
|
||||
return fmt.Errorf("unknown scheme in host parameter")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -146,7 +147,7 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error {
|
|||
continue
|
||||
}
|
||||
default:
|
||||
log.Printf("D! OpenTSDB does not support metric value: [%s] of type [%T].\n", value, value)
|
||||
o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -195,13 +196,13 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
|
|||
continue
|
||||
}
|
||||
default:
|
||||
log.Printf("D! OpenTSDB does not support metric value: [%s] of type [%T].\n", value, value)
|
||||
o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
metricValue, buildError := buildValue(value)
|
||||
if buildError != nil {
|
||||
log.Printf("E! OpenTSDB: %s\n", buildError.Error())
|
||||
o.Log.Errorf("OpenTSDB: %s", buildError.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -183,9 +183,8 @@ func (p *PrometheusClient) Init() error {
|
|||
func (p *PrometheusClient) listen() (net.Listener, error) {
|
||||
if p.server.TLSConfig != nil {
|
||||
return tls.Listen("tcp", p.Listen, p.server.TLSConfig)
|
||||
} else {
|
||||
return net.Listen("tcp", p.Listen)
|
||||
}
|
||||
return net.Listen("tcp", p.Listen)
|
||||
}
|
||||
|
||||
func (p *PrometheusClient) Connect() error {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package riemann
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
|
|
@ -16,15 +15,16 @@ import (
|
|||
)
|
||||
|
||||
type Riemann struct {
|
||||
URL string
|
||||
TTL float32
|
||||
Separator string
|
||||
MeasurementAsAttribute bool
|
||||
StringAsState bool
|
||||
TagKeys []string
|
||||
Tags []string
|
||||
DescriptionText string
|
||||
Timeout internal.Duration
|
||||
URL string `toml:"url"`
|
||||
TTL float32 `toml:"ttl"`
|
||||
Separator string `toml:"separator"`
|
||||
MeasurementAsAttribute bool `toml:"measurement_as_attribute"`
|
||||
StringAsState bool `toml:"string_as_state"`
|
||||
TagKeys []string `toml:"tag_keys"`
|
||||
Tags []string `toml:"tags"`
|
||||
DescriptionText string `toml:"description_text"`
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
client *raidman.Client
|
||||
}
|
||||
|
|
@ -149,14 +149,14 @@ func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event {
|
|||
case string:
|
||||
// only send string metrics if explicitly enabled, skip otherwise
|
||||
if !r.StringAsState {
|
||||
log.Printf("D! Riemann event states disabled, skipping metric value [%s]\n", value)
|
||||
r.Log.Debugf("Riemann event states disabled, skipping metric value [%s]", value)
|
||||
continue
|
||||
}
|
||||
event.State = value.(string)
|
||||
case int, int64, uint64, float32, float64:
|
||||
event.Metric = value
|
||||
default:
|
||||
log.Printf("D! Riemann does not support metric value [%s]\n", value)
|
||||
r.Log.Debugf("Riemann does not support metric value [%s]", value)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package riemann
|
||||
|
||||
import (
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -12,7 +13,9 @@ import (
|
|||
func TestAttributes(t *testing.T) {
|
||||
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
|
||||
|
||||
r := &Riemann{}
|
||||
r := &Riemann{
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.Equal(t,
|
||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||
r.attributes("test", tags))
|
||||
|
|
@ -27,6 +30,7 @@ func TestAttributes(t *testing.T) {
|
|||
func TestService(t *testing.T) {
|
||||
r := &Riemann{
|
||||
Separator: "/",
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.Equal(t, "test/value", r.service("test", "value"))
|
||||
|
||||
|
|
@ -41,6 +45,7 @@ func TestTags(t *testing.T) {
|
|||
// all tag values plus additional tag should be present
|
||||
r := &Riemann{
|
||||
Tags: []string{"test"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.Equal(t,
|
||||
[]string{"test", "value1", "value2"},
|
||||
|
|
@ -67,6 +72,7 @@ func TestMetricEvents(t *testing.T) {
|
|||
MeasurementAsAttribute: false,
|
||||
DescriptionText: "metrics from telegraf",
|
||||
Tags: []string{"telegraf"},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// build a single event
|
||||
|
|
@ -126,6 +132,7 @@ func TestMetricEvents(t *testing.T) {
|
|||
func TestStateEvents(t *testing.T) {
|
||||
r := &Riemann{
|
||||
MeasurementAsAttribute: true,
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
// string metrics will be skipped unless explicitly enabled
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
|||
if r.client == nil {
|
||||
err := r.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err)
|
||||
return fmt.Errorf("failed to (re)connect to Riemann, error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -85,8 +85,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
|||
var senderr = r.client.SendMulti(events)
|
||||
if senderr != nil {
|
||||
r.Close() // always returns nil
|
||||
return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n",
|
||||
senderr)
|
||||
return fmt.Errorf("failed to send riemann message (will try to reconnect), error: %s", senderr)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ func (w *Wavefront) Connect() error {
|
|||
FlushIntervalSeconds: flushSeconds,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Url: %s", w.Url)
|
||||
return fmt.Errorf("could not create Wavefront Sender for Url: %s", w.Url)
|
||||
}
|
||||
w.sender = sender
|
||||
} else {
|
||||
|
|
@ -153,7 +153,7 @@ func (w *Wavefront) Connect() error {
|
|||
FlushIntervalSeconds: flushSeconds,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
|
||||
return fmt.Errorf("could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
|
||||
}
|
||||
w.sender = sender
|
||||
}
|
||||
|
|
@ -174,7 +174,7 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
|
|||
err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
||||
if err != nil {
|
||||
if isRetryable(err) {
|
||||
return fmt.Errorf("Wavefront sending error: %v", err)
|
||||
return fmt.Errorf("wavefront sending error: %v", err)
|
||||
}
|
||||
w.Log.Errorf("non-retryable error during Wavefront.Write: %v", err)
|
||||
w.Log.Debugf("Non-retryable metric data: Name: %v, Value: %v, Timestamp: %v, Source: %v, PointTags: %v ", point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
||||
|
|
@ -306,9 +306,8 @@ func buildValue(v interface{}, name string, w *Wavefront) (float64, error) {
|
|||
if w.ConvertBool {
|
||||
if p {
|
||||
return 1, nil
|
||||
} else {
|
||||
return 0, nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
case int64:
|
||||
return float64(v.(int64)), nil
|
||||
|
|
|
|||
|
|
@ -36,25 +36,22 @@ var (
|
|||
func unescape(b []byte) string {
|
||||
if bytes.ContainsAny(b, escapes) {
|
||||
return unescaper.Replace(unsafeBytesToString(b))
|
||||
} else {
|
||||
return string(b)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func nameUnescape(b []byte) string {
|
||||
if bytes.ContainsAny(b, nameEscapes) {
|
||||
return nameUnescaper.Replace(unsafeBytesToString(b))
|
||||
} else {
|
||||
return string(b)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func stringFieldUnescape(b []byte) string {
|
||||
if bytes.ContainsAny(b, stringFieldEscapes) {
|
||||
return stringFieldUnescaper.Replace(unsafeBytesToString(b))
|
||||
} else {
|
||||
return string(b)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
. "github.com/influxdata/telegraf/plugins/parsers/prometheus/common"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/prometheus/common"
|
||||
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
|
|
@ -63,7 +63,7 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
for metricName, mf := range metricFamilies {
|
||||
for _, m := range mf.Metric {
|
||||
// reading tags
|
||||
tags := MakeLabels(m, p.DefaultTags)
|
||||
tags := common.MakeLabels(m, p.DefaultTags)
|
||||
|
||||
if mf.GetType() == dto.MetricType_SUMMARY {
|
||||
// summary metric
|
||||
|
|
@ -81,7 +81,7 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
// converting to telegraf metric
|
||||
if len(fields) > 0 {
|
||||
t := getTimestamp(m, now)
|
||||
metric, err := metric.New("prometheus", tags, fields, t, ValueType(mf.GetType()))
|
||||
metric, err := metric.New("prometheus", tags, fields, t, common.ValueType(mf.GetType()))
|
||||
if err == nil {
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
|
@ -100,11 +100,11 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
|||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, fmt.Errorf("No metrics in line")
|
||||
return nil, fmt.Errorf("no metrics in line")
|
||||
}
|
||||
|
||||
if len(metrics) > 1 {
|
||||
return nil, fmt.Errorf("More than one metric in line")
|
||||
return nil, fmt.Errorf("more than one metric in line")
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
|
|
@ -122,7 +122,7 @@ func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, met
|
|||
|
||||
fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount())
|
||||
fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum())
|
||||
met, err := metric.New("prometheus", tags, fields, t, ValueType(metricType))
|
||||
met, err := metric.New("prometheus", tags, fields, t, common.ValueType(metricType))
|
||||
if err == nil {
|
||||
metrics = append(metrics, met)
|
||||
}
|
||||
|
|
@ -134,7 +134,7 @@ func makeQuantiles(m *dto.Metric, tags map[string]string, metricName string, met
|
|||
newTags["quantile"] = fmt.Sprint(q.GetQuantile())
|
||||
fields[metricName] = float64(q.GetValue())
|
||||
|
||||
quantileMetric, err := metric.New("prometheus", newTags, fields, t, ValueType(metricType))
|
||||
quantileMetric, err := metric.New("prometheus", newTags, fields, t, common.ValueType(metricType))
|
||||
if err == nil {
|
||||
metrics = append(metrics, quantileMetric)
|
||||
}
|
||||
|
|
@ -151,7 +151,7 @@ func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metri
|
|||
fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount())
|
||||
fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum())
|
||||
|
||||
met, err := metric.New("prometheus", tags, fields, t, ValueType(metricType))
|
||||
met, err := metric.New("prometheus", tags, fields, t, common.ValueType(metricType))
|
||||
if err == nil {
|
||||
metrics = append(metrics, met)
|
||||
}
|
||||
|
|
@ -162,7 +162,7 @@ func makeBuckets(m *dto.Metric, tags map[string]string, metricName string, metri
|
|||
newTags["le"] = fmt.Sprint(b.GetUpperBound())
|
||||
fields[metricName+"_bucket"] = float64(b.GetCumulativeCount())
|
||||
|
||||
histogramMetric, err := metric.New("prometheus", newTags, fields, t, ValueType(metricType))
|
||||
histogramMetric, err := metric.New("prometheus", newTags, fields, t, common.ValueType(metricType))
|
||||
if err == nil {
|
||||
metrics = append(metrics, histogramMetric)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package all
|
||||
|
||||
import (
|
||||
//Blank imports for plugins to register themselves
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/aws/ec2"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/clone"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/converter"
|
||||
|
|
|
|||
|
|
@ -349,9 +349,8 @@ func toInteger(v interface{}) (int64, bool) {
|
|||
case uint64:
|
||||
if value <= uint64(math.MaxInt64) {
|
||||
return int64(value), true
|
||||
} else {
|
||||
return math.MaxInt64, true
|
||||
}
|
||||
return math.MaxInt64, true
|
||||
case float64:
|
||||
if value < float64(math.MinInt64) {
|
||||
return math.MinInt64, true
|
||||
|
|
@ -363,9 +362,8 @@ func toInteger(v interface{}) (int64, bool) {
|
|||
case bool:
|
||||
if value {
|
||||
return 1, true
|
||||
} else {
|
||||
return 0, true
|
||||
}
|
||||
return 0, true
|
||||
case string:
|
||||
result, err := strconv.ParseInt(value, 0, 64)
|
||||
|
||||
|
|
@ -388,9 +386,8 @@ func toUnsigned(v interface{}) (uint64, bool) {
|
|||
case int64:
|
||||
if value < 0 {
|
||||
return 0, true
|
||||
} else {
|
||||
return uint64(value), true
|
||||
}
|
||||
return uint64(value), true
|
||||
case float64:
|
||||
if value < 0.0 {
|
||||
return 0, true
|
||||
|
|
@ -402,9 +399,8 @@ func toUnsigned(v interface{}) (uint64, bool) {
|
|||
case bool:
|
||||
if value {
|
||||
return 1, true
|
||||
} else {
|
||||
return 0, true
|
||||
}
|
||||
return 0, true
|
||||
case string:
|
||||
result, err := strconv.ParseUint(value, 0, 64)
|
||||
|
||||
|
|
@ -431,9 +427,8 @@ func toFloat(v interface{}) (float64, bool) {
|
|||
case bool:
|
||||
if value {
|
||||
return 1.0, true
|
||||
} else {
|
||||
return 0.0, true
|
||||
}
|
||||
return 0.0, true
|
||||
case string:
|
||||
result, err := strconv.ParseFloat(value, 64)
|
||||
return result, err == nil
|
||||
|
|
|
|||
|
|
@ -268,9 +268,8 @@ func (d *IfName) getMap(agent string) (entry nameMap, age time.Duration, err err
|
|||
d.rwLock.RUnlock()
|
||||
if ok {
|
||||
return m, age, nil
|
||||
} else {
|
||||
return nil, 0, fmt.Errorf("getting remote table from cache")
|
||||
}
|
||||
return nil, 0, fmt.Errorf("getting remote table from cache")
|
||||
}
|
||||
|
||||
// The cache missed and this is the first request for this
|
||||
|
|
|
|||
|
|
@ -43,10 +43,10 @@ func (c *TTLCache) Get(key keyType) (valType, bool, time.Duration) {
|
|||
age := c.now().Sub(v.time)
|
||||
if age < c.validDuration {
|
||||
return v.val, ok, age
|
||||
} else {
|
||||
c.lru.Delete(key)
|
||||
return valType{}, false, 0
|
||||
}
|
||||
|
||||
c.lru.Delete(key)
|
||||
return valType{}, false, 0
|
||||
}
|
||||
|
||||
func (c *TTLCache) Put(key keyType, value valType) {
|
||||
|
|
|
|||
|
|
@ -287,9 +287,9 @@ func (s *Strings) initOnce() {
|
|||
newString := strings.Replace(s, c.Old, c.New, -1)
|
||||
if newString == "" {
|
||||
return s
|
||||
} else {
|
||||
return newString
|
||||
}
|
||||
|
||||
return newString
|
||||
}
|
||||
s.converters = append(s.converters, c)
|
||||
}
|
||||
|
|
@ -298,9 +298,9 @@ func (s *Strings) initOnce() {
|
|||
c.fn = func(s string) string {
|
||||
if len(s) < c.Width {
|
||||
return s
|
||||
} else {
|
||||
return s[:c.Width]
|
||||
}
|
||||
|
||||
return s[:c.Width]
|
||||
}
|
||||
s.converters = append(s.converters, c)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package topk
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sort"
|
||||
"time"
|
||||
|
|
@ -15,15 +14,16 @@ import (
|
|||
)
|
||||
|
||||
type TopK struct {
|
||||
Period internal.Duration
|
||||
K int
|
||||
GroupBy []string `toml:"group_by"`
|
||||
Fields []string
|
||||
Aggregation string
|
||||
Bottomk bool
|
||||
AddGroupByTag string `toml:"add_groupby_tag"`
|
||||
AddRankFields []string `toml:"add_rank_fields"`
|
||||
AddAggregateFields []string `toml:"add_aggregate_fields"`
|
||||
Period internal.Duration `toml:"period"`
|
||||
K int `toml:"k"`
|
||||
GroupBy []string `toml:"group_by"`
|
||||
Fields []string `toml:"fields"`
|
||||
Aggregation string `toml:"aggregation"`
|
||||
Bottomk bool `toml:"bottomk"`
|
||||
AddGroupByTag string `toml:"add_groupby_tag"`
|
||||
AddRankFields []string `toml:"add_rank_fields"`
|
||||
AddAggregateFields []string `toml:"add_aggregate_fields"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
cache map[string][]telegraf.Metric
|
||||
tagsGlobs filter.Filter
|
||||
|
|
@ -112,9 +112,8 @@ func sortMetrics(metrics []MetricAggregation, field string, reverse bool) {
|
|||
jv := metrics[j].values[field]
|
||||
if iv < jv {
|
||||
return true
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
if reverse {
|
||||
|
|
@ -174,7 +173,7 @@ func (t *TopK) groupBy(m telegraf.Metric) {
|
|||
if err != nil {
|
||||
// If we could not generate the groupkey, fail hard
|
||||
// by dropping this and all subsequent metrics
|
||||
log.Printf("E! [processors.topk]: could not generate group key: %v", err)
|
||||
t.Log.Errorf("Could not generate group key: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -269,7 +268,7 @@ func (t *TopK) push() []telegraf.Metric {
|
|||
if err != nil {
|
||||
// If we could not generate the aggregation
|
||||
// function, fail hard by dropping all metrics
|
||||
log.Printf("E! [processors.topk]: %v", err)
|
||||
t.Log.Errorf("%v", err)
|
||||
return []telegraf.Metric{}
|
||||
}
|
||||
for k, ms := range t.cache {
|
||||
|
|
@ -342,7 +341,7 @@ func (t *TopK) getAggregationFunction(aggOperation string) (func([]telegraf.Metr
|
|||
}
|
||||
val, ok := convert(fieldVal)
|
||||
if !ok {
|
||||
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
|
||||
t.Log.Infof("Cannot convert value '%s' from metric '%s' with tags '%s'",
|
||||
m.Fields()[field], m.Name(), m.Tags())
|
||||
continue
|
||||
}
|
||||
|
|
@ -408,7 +407,7 @@ func (t *TopK) getAggregationFunction(aggOperation string) (func([]telegraf.Metr
|
|||
}
|
||||
val, ok := convert(fieldVal)
|
||||
if !ok {
|
||||
log.Printf("Cannot convert value '%s' from metric '%s' with tags '%s'",
|
||||
t.Log.Infof("Cannot convert value '%s' from metric '%s' with tags '%s'",
|
||||
m.Fields()[field], m.Name(), m.Tags())
|
||||
continue
|
||||
}
|
||||
|
|
@ -434,7 +433,7 @@ func (t *TopK) getAggregationFunction(aggOperation string) (func([]telegraf.Metr
|
|||
}, nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Unknown aggregation function '%s'. No metrics will be processed", t.Aggregation)
|
||||
return nil, fmt.Errorf("unknown aggregation function '%s', no metrics will be processed", t.Aggregation)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -119,9 +119,8 @@ func formatValue(value interface{}) string {
|
|||
case bool:
|
||||
if v {
|
||||
return "1"
|
||||
} else {
|
||||
return "0"
|
||||
}
|
||||
return "0"
|
||||
case uint64:
|
||||
return strconv.FormatUint(v, 10)
|
||||
case int64:
|
||||
|
|
@ -214,11 +213,11 @@ func InitGraphiteTemplates(templates []string) ([]*GraphiteTemplate, string, err
|
|||
if len(parts) == 1 {
|
||||
if parts[0] == "" {
|
||||
return nil, "", fmt.Errorf("missing template at position: %d", i)
|
||||
} else {
|
||||
// Override default template
|
||||
defaultTemplate = t
|
||||
continue
|
||||
}
|
||||
|
||||
// Override default template
|
||||
defaultTemplate = t
|
||||
continue
|
||||
}
|
||||
|
||||
if len(parts) > 2 {
|
||||
|
|
|
|||
|
|
@ -38,25 +38,22 @@ var (
|
|||
func escape(s string) string {
|
||||
if strings.ContainsAny(s, escapes) {
|
||||
return escaper.Replace(s)
|
||||
} else {
|
||||
return s
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Escape a measurement name
|
||||
func nameEscape(s string) string {
|
||||
if strings.ContainsAny(s, nameEscapes) {
|
||||
return nameEscaper.Replace(s)
|
||||
} else {
|
||||
return s
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Escape a string field
|
||||
func stringFieldEscape(s string) string {
|
||||
if strings.ContainsAny(s, stringFieldEscapes) {
|
||||
return stringFieldEscaper.Replace(s)
|
||||
} else {
|
||||
return s
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
|
|
|||
|
|
@ -302,13 +302,11 @@ func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, er
|
|||
case uint64:
|
||||
if s.fieldTypeSupport&UintSupport != 0 {
|
||||
return appendUintField(buf, v), nil
|
||||
} else {
|
||||
if v <= uint64(MaxInt64) {
|
||||
return appendIntField(buf, int64(v)), nil
|
||||
} else {
|
||||
return appendIntField(buf, int64(MaxInt64)), nil
|
||||
}
|
||||
}
|
||||
if v <= uint64(MaxInt64) {
|
||||
return appendIntField(buf, int64(v)), nil
|
||||
}
|
||||
return appendIntField(buf, MaxInt64), nil
|
||||
case int64:
|
||||
return appendIntField(buf, v), nil
|
||||
case float64:
|
||||
|
|
|
|||
Loading…
Reference in New Issue