2017-09-30 07:13:08 +08:00
|
|
|
package wavefront
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"regexp"
|
|
|
|
|
"strings"
|
|
|
|
|
|
2021-11-25 03:33:45 +08:00
|
|
|
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
|
|
|
)
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
const maxTagLength = 254
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
type Wavefront struct {
|
2021-03-02 05:04:35 +08:00
|
|
|
URL string `toml:"url"`
|
|
|
|
|
Token string `toml:"token"`
|
|
|
|
|
Host string `toml:"host"`
|
|
|
|
|
Port int `toml:"port"`
|
|
|
|
|
Prefix string `toml:"prefix"`
|
|
|
|
|
SimpleFields bool `toml:"simple_fields"`
|
|
|
|
|
MetricSeparator string `toml:"metric_separator"`
|
|
|
|
|
ConvertPaths bool `toml:"convert_paths"`
|
|
|
|
|
ConvertBool bool `toml:"convert_bool"`
|
|
|
|
|
UseRegex bool `toml:"use_regex"`
|
|
|
|
|
UseStrict bool `toml:"use_strict"`
|
|
|
|
|
TruncateTags bool `toml:"truncate_tags"`
|
|
|
|
|
ImmediateFlush bool `toml:"immediate_flush"`
|
|
|
|
|
SourceOverride []string `toml:"source_override"`
|
2022-03-02 06:05:53 +08:00
|
|
|
StringToNumber map[string][]map[string]float64 `toml:"string_to_number" deprecated:"1.9.0;use the enum processor instead"`
|
2018-12-22 03:26:07 +08:00
|
|
|
|
|
|
|
|
sender wavefront.Sender
|
2021-03-02 05:04:35 +08:00
|
|
|
Log telegraf.Logger `toml:"-"`
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// catch many of the invalid chars that could appear in a metric or tag name
|
|
|
|
|
var sanitizedChars = strings.NewReplacer(
|
|
|
|
|
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
|
|
|
|
|
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
|
|
|
|
|
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
|
|
|
|
|
">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-",
|
|
|
|
|
"=", "-",
|
|
|
|
|
)
|
|
|
|
|
|
2019-04-03 02:47:25 +08:00
|
|
|
// catch many of the invalid chars that could appear in a metric or tag name
|
|
|
|
|
var strictSanitizedChars = strings.NewReplacer(
|
|
|
|
|
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
|
|
|
|
|
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
|
|
|
|
|
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
|
|
|
|
|
">", "-", "?", "-", "\\", "-", "|", "-", " ", "-", "=", "-",
|
|
|
|
|
)
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer
|
2021-11-25 03:33:45 +08:00
|
|
|
var sanitizedRegex = regexp.MustCompile(`[^a-zA-Z\d_.-]`)
|
2017-09-30 07:13:08 +08:00
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
var tagValueReplacer = strings.NewReplacer("*", "-")
|
2017-09-30 07:13:08 +08:00
|
|
|
|
|
|
|
|
var pathReplacer = strings.NewReplacer("_", "_")
|
|
|
|
|
|
|
|
|
|
var sampleConfig = `
|
2022-03-08 00:25:48 +08:00
|
|
|
## Url for Wavefront Direct Ingestion. For Wavefront Proxy Ingestion, see
|
|
|
|
|
## the 'host' and 'port' optioins below.
|
2018-12-22 03:26:07 +08:00
|
|
|
url = "https://metrics.wavefront.com"
|
|
|
|
|
|
|
|
|
|
## Authentication Token for Wavefront. Only required if using Direct Ingestion
|
|
|
|
|
#token = "DUMMY_TOKEN"
|
|
|
|
|
|
|
|
|
|
## DNS name of the wavefront proxy server. Do not use if url is specified
|
|
|
|
|
#host = "wavefront.example.com"
|
2017-09-30 07:13:08 +08:00
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
## Port that the Wavefront proxy server listens on. Do not use if url is specified
|
|
|
|
|
#port = 2878
|
2017-09-30 07:13:08 +08:00
|
|
|
|
|
|
|
|
## prefix for metrics keys
|
|
|
|
|
#prefix = "my.specific.prefix."
|
|
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
## whether to use "value" for name of simple fields. default is false
|
2017-09-30 07:13:08 +08:00
|
|
|
#simple_fields = false
|
|
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
## character to use between metric and field name. default is . (dot)
|
2017-09-30 07:13:08 +08:00
|
|
|
#metric_separator = "."
|
|
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
## Convert metric name paths to use metricSeparator character
|
|
|
|
|
## When true will convert all _ (underscore) characters in final metric name. default is true
|
2017-09-30 07:13:08 +08:00
|
|
|
#convert_paths = true
|
|
|
|
|
|
2019-04-03 02:47:25 +08:00
|
|
|
## Use Strict rules to sanitize metric and tag names from invalid characters
|
2020-05-14 15:41:58 +08:00
|
|
|
## When enabled forward slash (/) and comma (,) will be accepted
|
2019-04-03 02:47:25 +08:00
|
|
|
#use_strict = false
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
## Use Regex to sanitize metric and tag names from invalid characters
|
2018-12-22 03:26:07 +08:00
|
|
|
## Regex is more thorough, but significantly slower. default is false
|
2017-09-30 07:13:08 +08:00
|
|
|
#use_regex = false
|
|
|
|
|
|
|
|
|
|
## point tags to use as the source name for Wavefront (if none found, host will be used)
|
2018-12-22 03:26:07 +08:00
|
|
|
#source_override = ["hostname", "address", "agent_host", "node_host"]
|
2017-09-30 07:13:08 +08:00
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true
|
2017-09-30 07:13:08 +08:00
|
|
|
#convert_bool = true
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any
|
|
|
|
|
## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility.
|
|
|
|
|
#truncate_tags = false
|
|
|
|
|
|
2020-11-03 12:12:48 +08:00
|
|
|
## Flush the internal buffers after each batch. This effectively bypasses the background sending of metrics
|
|
|
|
|
## normally done by the Wavefront SDK. This can be used if you are experiencing buffer overruns. The sending
|
|
|
|
|
## of metrics will block for a longer time, but this will be handled gracefully by the internal buffering in
|
|
|
|
|
## Telegraf.
|
|
|
|
|
#immediate_flush = true
|
2017-09-30 07:13:08 +08:00
|
|
|
`
|
|
|
|
|
|
|
|
|
|
type MetricPoint struct {
|
|
|
|
|
Metric string
|
|
|
|
|
Value float64
|
|
|
|
|
Timestamp int64
|
|
|
|
|
Source string
|
|
|
|
|
Tags map[string]string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Wavefront) Connect() error {
|
2020-11-03 12:12:48 +08:00
|
|
|
flushSeconds := 5
|
|
|
|
|
if w.ImmediateFlush {
|
|
|
|
|
flushSeconds = 86400 // Set a very long flush interval if we're flushing directly
|
|
|
|
|
}
|
2021-03-02 05:04:35 +08:00
|
|
|
if w.URL != "" {
|
|
|
|
|
w.Log.Debug("connecting over http/https using Url: %s", w.URL)
|
2018-12-22 03:26:07 +08:00
|
|
|
sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{
|
2021-03-02 05:04:35 +08:00
|
|
|
Server: w.URL,
|
2018-12-22 03:26:07 +08:00
|
|
|
Token: w.Token,
|
2020-11-03 12:12:48 +08:00
|
|
|
FlushIntervalSeconds: flushSeconds,
|
2018-12-22 03:26:07 +08:00
|
|
|
})
|
|
|
|
|
if err != nil {
|
2021-03-02 05:04:35 +08:00
|
|
|
return fmt.Errorf("could not create Wavefront Sender for Url: %s", w.URL)
|
2018-12-22 03:26:07 +08:00
|
|
|
}
|
|
|
|
|
w.sender = sender
|
|
|
|
|
} else {
|
2020-08-07 22:12:14 +08:00
|
|
|
w.Log.Debugf("connecting over tcp using Host: %q and Port: %d", w.Host, w.Port)
|
2018-12-22 03:26:07 +08:00
|
|
|
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
|
|
|
|
|
Host: w.Host,
|
|
|
|
|
MetricsPort: w.Port,
|
2020-11-03 12:12:48 +08:00
|
|
|
FlushIntervalSeconds: flushSeconds,
|
2018-12-22 03:26:07 +08:00
|
|
|
})
|
|
|
|
|
if err != nil {
|
2021-02-09 00:18:40 +08:00
|
|
|
return fmt.Errorf("could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
|
2018-12-22 03:26:07 +08:00
|
|
|
}
|
|
|
|
|
w.sender = sender
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
if w.ConvertPaths && w.MetricSeparator == "_" {
|
|
|
|
|
w.ConvertPaths = false
|
|
|
|
|
}
|
|
|
|
|
if w.ConvertPaths {
|
|
|
|
|
pathReplacer = strings.NewReplacer("_", w.MetricSeparator)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Wavefront) Write(metrics []telegraf.Metric) error {
|
|
|
|
|
for _, m := range metrics {
|
2020-05-14 03:02:39 +08:00
|
|
|
for _, point := range w.buildMetrics(m) {
|
2018-12-22 03:26:07 +08:00
|
|
|
err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
2017-09-30 07:13:08 +08:00
|
|
|
if err != nil {
|
2020-11-14 06:08:05 +08:00
|
|
|
if isRetryable(err) {
|
2021-12-15 06:07:10 +08:00
|
|
|
if flushErr := w.sender.Flush(); flushErr != nil {
|
|
|
|
|
w.Log.Errorf("wavefront flushing error: %v", flushErr)
|
|
|
|
|
}
|
2021-12-15 06:43:37 +08:00
|
|
|
return fmt.Errorf("wavefront sending error: %v", err)
|
2020-11-14 06:08:05 +08:00
|
|
|
}
|
|
|
|
|
w.Log.Errorf("non-retryable error during Wavefront.Write: %v", err)
|
2020-11-16 22:54:58 +08:00
|
|
|
w.Log.Debugf("Non-retryable metric data: Name: %v, Value: %v, Timestamp: %v, Source: %v, PointTags: %v ", point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2020-11-03 12:12:48 +08:00
|
|
|
if w.ImmediateFlush {
|
|
|
|
|
w.Log.Debugf("Flushing batch of %d points", len(metrics))
|
|
|
|
|
return w.sender.Flush()
|
|
|
|
|
}
|
2017-09-30 07:13:08 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
func (w *Wavefront) buildMetrics(m telegraf.Metric) []*MetricPoint {
|
2017-09-30 07:13:08 +08:00
|
|
|
ret := []*MetricPoint{}
|
|
|
|
|
|
|
|
|
|
for fieldName, value := range m.Fields() {
|
|
|
|
|
var name string
|
|
|
|
|
if !w.SimpleFields && fieldName == "value" {
|
|
|
|
|
name = fmt.Sprintf("%s%s", w.Prefix, m.Name())
|
|
|
|
|
} else {
|
|
|
|
|
name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if w.UseRegex {
|
|
|
|
|
name = sanitizedRegex.ReplaceAllLiteralString(name, "-")
|
2019-04-03 02:47:25 +08:00
|
|
|
} else if w.UseStrict {
|
|
|
|
|
name = strictSanitizedChars.Replace(name)
|
2017-09-30 07:13:08 +08:00
|
|
|
} else {
|
|
|
|
|
name = sanitizedChars.Replace(name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if w.ConvertPaths {
|
|
|
|
|
name = pathReplacer.Replace(name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metric := &MetricPoint{
|
|
|
|
|
Metric: name,
|
2018-03-28 08:30:51 +08:00
|
|
|
Timestamp: m.Time().Unix(),
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metricValue, buildError := buildValue(value, metric.Metric, w)
|
|
|
|
|
if buildError != nil {
|
2020-08-27 00:58:28 +08:00
|
|
|
w.Log.Debugf("Error building tags: %s\n", buildError.Error())
|
2017-09-30 07:13:08 +08:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
metric.Value = metricValue
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
source, tags := w.buildTags(m.Tags())
|
2017-09-30 07:13:08 +08:00
|
|
|
metric.Source = source
|
|
|
|
|
metric.Tags = tags
|
|
|
|
|
|
|
|
|
|
ret = append(ret, metric)
|
|
|
|
|
}
|
|
|
|
|
return ret
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
func (w *Wavefront) buildTags(mTags map[string]string) (string, map[string]string) {
|
2018-06-12 05:54:08 +08:00
|
|
|
// Remove all empty tags.
|
|
|
|
|
for k, v := range mTags {
|
|
|
|
|
if v == "" {
|
|
|
|
|
delete(mTags, k)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
// find source, use source_override property if needed
|
2017-09-30 07:13:08 +08:00
|
|
|
var source string
|
2018-08-14 07:37:06 +08:00
|
|
|
if s, ok := mTags["source"]; ok {
|
|
|
|
|
source = s
|
|
|
|
|
delete(mTags, "source")
|
|
|
|
|
} else {
|
|
|
|
|
sourceTagFound := false
|
|
|
|
|
for _, s := range w.SourceOverride {
|
|
|
|
|
for k, v := range mTags {
|
|
|
|
|
if k == s {
|
|
|
|
|
source = v
|
|
|
|
|
mTags["telegraf_host"] = mTags["host"]
|
|
|
|
|
sourceTagFound = true
|
|
|
|
|
delete(mTags, k)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if sourceTagFound {
|
2017-09-30 07:13:08 +08:00
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
2018-08-14 07:37:06 +08:00
|
|
|
|
|
|
|
|
if !sourceTagFound {
|
|
|
|
|
source = mTags["host"]
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
}
|
2018-12-22 03:26:07 +08:00
|
|
|
source = tagValueReplacer.Replace(source)
|
2017-09-30 07:13:08 +08:00
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
// remove default host tag
|
2017-09-30 07:13:08 +08:00
|
|
|
delete(mTags, "host")
|
|
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
// sanitize tag keys and values
|
|
|
|
|
tags := make(map[string]string)
|
|
|
|
|
for k, v := range mTags {
|
|
|
|
|
var key string
|
|
|
|
|
if w.UseRegex {
|
|
|
|
|
key = sanitizedRegex.ReplaceAllLiteralString(k, "-")
|
2019-04-03 02:47:25 +08:00
|
|
|
} else if w.UseStrict {
|
|
|
|
|
key = strictSanitizedChars.Replace(k)
|
2018-12-22 03:26:07 +08:00
|
|
|
} else {
|
|
|
|
|
key = sanitizedChars.Replace(k)
|
|
|
|
|
}
|
|
|
|
|
val := tagValueReplacer.Replace(v)
|
2020-05-14 03:02:39 +08:00
|
|
|
if w.TruncateTags {
|
|
|
|
|
if len(key) > maxTagLength {
|
|
|
|
|
w.Log.Warnf("Tag key length > 254. Skipping tag: %s", key)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if len(key)+len(val) > maxTagLength {
|
|
|
|
|
w.Log.Debugf("Key+value length > 254: %s", key)
|
|
|
|
|
val = val[:maxTagLength-len(key)]
|
|
|
|
|
}
|
|
|
|
|
}
|
2018-12-22 03:26:07 +08:00
|
|
|
tags[key] = val
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return source, tags
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func buildValue(v interface{}, name string, w *Wavefront) (float64, error) {
|
|
|
|
|
switch p := v.(type) {
|
|
|
|
|
case bool:
|
|
|
|
|
if w.ConvertBool {
|
|
|
|
|
if p {
|
|
|
|
|
return 1, nil
|
|
|
|
|
}
|
2021-02-09 00:18:40 +08:00
|
|
|
return 0, nil
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
case int64:
|
|
|
|
|
return float64(v.(int64)), nil
|
|
|
|
|
case uint64:
|
|
|
|
|
return float64(v.(uint64)), nil
|
|
|
|
|
case float64:
|
|
|
|
|
return v.(float64), nil
|
|
|
|
|
case string:
|
|
|
|
|
for prefix, mappings := range w.StringToNumber {
|
|
|
|
|
if strings.HasPrefix(name, prefix) {
|
|
|
|
|
for _, mapping := range mappings {
|
2021-03-24 23:27:46 +08:00
|
|
|
val, hasVal := mapping[p]
|
2017-09-30 07:13:08 +08:00
|
|
|
if hasVal {
|
|
|
|
|
return val, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
|
|
|
default:
|
|
|
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
|
|
|
}
|
|
|
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Wavefront) SampleConfig() string {
|
|
|
|
|
return sampleConfig
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Wavefront) Description() string {
|
|
|
|
|
return "Configuration for Wavefront server to send metrics to"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Wavefront) Close() error {
|
2018-12-22 03:26:07 +08:00
|
|
|
w.sender.Close()
|
2017-09-30 07:13:08 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
outputs.Add("wavefront", func() telegraf.Output {
|
|
|
|
|
return &Wavefront{
|
2018-12-22 03:26:07 +08:00
|
|
|
Token: "DUMMY_TOKEN",
|
2017-09-30 07:13:08 +08:00
|
|
|
MetricSeparator: ".",
|
|
|
|
|
ConvertPaths: true,
|
|
|
|
|
ConvertBool: true,
|
2020-05-14 03:02:39 +08:00
|
|
|
TruncateTags: false,
|
2020-11-03 12:12:48 +08:00
|
|
|
ImmediateFlush: true,
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
2020-11-14 06:08:05 +08:00
|
|
|
|
|
|
|
|
// TODO: Currently there's no canonical way to exhaust all
|
|
|
|
|
// retryable/non-retryable errors from wavefront, so this implementation just
|
|
|
|
|
// handles known non-retryable errors in a case-by-case basis and assumes all
|
|
|
|
|
// other errors are retryable.
|
|
|
|
|
// A support ticket has been filed against wavefront to provide a canonical way
|
|
|
|
|
// to distinguish between retryable and non-retryable errors (link is not
|
|
|
|
|
// public).
|
|
|
|
|
func isRetryable(err error) bool {
|
|
|
|
|
if err != nil {
|
|
|
|
|
// "empty metric name" errors are non-retryable as retry will just keep
|
|
|
|
|
// getting the same error again and again.
|
|
|
|
|
if strings.Contains(err.Error(), "empty metric name") {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|