2022-05-25 22:48:59 +08:00
|
|
|
//go:generate ../../../tools/readme_config_includer/generator
|
2017-09-30 07:13:08 +08:00
|
|
|
package wavefront
|
|
|
|
|
|
|
|
|
|
import (
|
2022-05-25 22:48:59 +08:00
|
|
|
_ "embed"
|
2017-09-30 07:13:08 +08:00
|
|
|
"fmt"
|
2022-08-02 05:27:56 +08:00
|
|
|
"net/url"
|
2017-09-30 07:13:08 +08:00
|
|
|
"regexp"
|
|
|
|
|
"strings"
|
|
|
|
|
|
2021-11-25 03:33:45 +08:00
|
|
|
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
|
|
|
)
|
|
|
|
|
|
2022-05-25 22:48:59 +08:00
|
|
|
//go:embed sample.conf
|
|
|
|
|
var sampleConfig string
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
const maxTagLength = 254
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
type Wavefront struct {
|
2022-06-30 03:27:56 +08:00
|
|
|
URL string `toml:"url"`
|
|
|
|
|
Token string `toml:"token"`
|
2022-08-02 05:27:56 +08:00
|
|
|
Host string `toml:"host" deprecated:"2.4.0;use url instead"`
|
|
|
|
|
Port int `toml:"port" deprecated:"2.4.0;use url instead"`
|
2022-06-30 03:27:56 +08:00
|
|
|
Prefix string `toml:"prefix"`
|
|
|
|
|
SimpleFields bool `toml:"simple_fields"`
|
|
|
|
|
MetricSeparator string `toml:"metric_separator"`
|
|
|
|
|
ConvertPaths bool `toml:"convert_paths"`
|
|
|
|
|
ConvertBool bool `toml:"convert_bool"`
|
|
|
|
|
HTTPMaximumBatchSize int `toml:"http_maximum_batch_size"`
|
|
|
|
|
UseRegex bool `toml:"use_regex"`
|
|
|
|
|
UseStrict bool `toml:"use_strict"`
|
|
|
|
|
TruncateTags bool `toml:"truncate_tags"`
|
|
|
|
|
ImmediateFlush bool `toml:"immediate_flush"`
|
|
|
|
|
SourceOverride []string `toml:"source_override"`
|
|
|
|
|
StringToNumber map[string][]map[string]float64 `toml:"string_to_number" deprecated:"1.9.0;use the enum processor instead"`
|
2018-12-22 03:26:07 +08:00
|
|
|
|
|
|
|
|
sender wavefront.Sender
|
2021-03-02 05:04:35 +08:00
|
|
|
Log telegraf.Logger `toml:"-"`
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// catch many of the invalid chars that could appear in a metric or tag name
|
|
|
|
|
var sanitizedChars = strings.NewReplacer(
|
|
|
|
|
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
|
|
|
|
|
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
|
|
|
|
|
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
|
|
|
|
|
">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-",
|
|
|
|
|
"=", "-",
|
|
|
|
|
)
|
|
|
|
|
|
2019-04-03 02:47:25 +08:00
|
|
|
// catch many of the invalid chars that could appear in a metric or tag name
|
|
|
|
|
var strictSanitizedChars = strings.NewReplacer(
|
|
|
|
|
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
|
|
|
|
|
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
|
|
|
|
|
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
|
|
|
|
|
">", "-", "?", "-", "\\", "-", "|", "-", " ", "-", "=", "-",
|
|
|
|
|
)
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer
|
2021-11-25 03:33:45 +08:00
|
|
|
var sanitizedRegex = regexp.MustCompile(`[^a-zA-Z\d_.-]`)
|
2017-09-30 07:13:08 +08:00
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
var tagValueReplacer = strings.NewReplacer("*", "-")
|
2017-09-30 07:13:08 +08:00
|
|
|
|
|
|
|
|
var pathReplacer = strings.NewReplacer("_", "_")
|
|
|
|
|
|
|
|
|
|
type MetricPoint struct {
|
|
|
|
|
Metric string
|
|
|
|
|
Value float64
|
|
|
|
|
Timestamp int64
|
|
|
|
|
Source string
|
|
|
|
|
Tags map[string]string
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-25 22:48:59 +08:00
|
|
|
func (*Wavefront) SampleConfig() string {
|
|
|
|
|
return sampleConfig
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-02 05:27:56 +08:00
|
|
|
func senderURLFromURLAndToken(rawURL, token string) (string, error) {
|
|
|
|
|
newURL, err := url.Parse(rawURL)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", fmt.Errorf("could not parse the provided Url: %s", rawURL)
|
|
|
|
|
}
|
|
|
|
|
newURL.User = url.User(token)
|
|
|
|
|
|
|
|
|
|
return newURL.String(), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func senderURLFromHostAndPort(host string, port int) string {
|
|
|
|
|
return fmt.Sprintf("http://%s:%d", host, port)
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
func (w *Wavefront) Connect() error {
|
2020-11-03 12:12:48 +08:00
|
|
|
flushSeconds := 5
|
|
|
|
|
if w.ImmediateFlush {
|
|
|
|
|
flushSeconds = 86400 // Set a very long flush interval if we're flushing directly
|
|
|
|
|
}
|
2022-08-02 05:27:56 +08:00
|
|
|
var connectionURL string
|
2021-03-02 05:04:35 +08:00
|
|
|
if w.URL != "" {
|
|
|
|
|
w.Log.Debug("connecting over http/https using Url: %s", w.URL)
|
2022-08-02 05:27:56 +08:00
|
|
|
connectionURLWithToken, err := senderURLFromURLAndToken(w.URL, w.Token)
|
2018-12-22 03:26:07 +08:00
|
|
|
if err != nil {
|
2022-08-02 05:27:56 +08:00
|
|
|
return err
|
2018-12-22 03:26:07 +08:00
|
|
|
}
|
2022-08-02 05:27:56 +08:00
|
|
|
connectionURL = connectionURLWithToken
|
2018-12-22 03:26:07 +08:00
|
|
|
} else {
|
2022-08-02 05:27:56 +08:00
|
|
|
w.Log.Warnf("configuration with host/port is deprecated. Please use url.")
|
|
|
|
|
w.Log.Debugf("connecting over http using Host: %q and Port: %d", w.Host, w.Port)
|
|
|
|
|
connectionURL = senderURLFromHostAndPort(w.Host, w.Port)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sender, err := wavefront.NewSender(connectionURL,
|
|
|
|
|
wavefront.BatchSize(w.HTTPMaximumBatchSize),
|
|
|
|
|
wavefront.FlushIntervalSeconds(flushSeconds),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("could not create Wavefront Sender for the provided url")
|
2018-12-22 03:26:07 +08:00
|
|
|
}
|
|
|
|
|
|
2022-08-02 05:27:56 +08:00
|
|
|
w.sender = sender
|
|
|
|
|
|
2017-09-30 07:13:08 +08:00
|
|
|
if w.ConvertPaths && w.MetricSeparator == "_" {
|
|
|
|
|
w.ConvertPaths = false
|
|
|
|
|
}
|
|
|
|
|
if w.ConvertPaths {
|
|
|
|
|
pathReplacer = strings.NewReplacer("_", w.MetricSeparator)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Wavefront) Write(metrics []telegraf.Metric) error {
|
|
|
|
|
for _, m := range metrics {
|
2020-05-14 03:02:39 +08:00
|
|
|
for _, point := range w.buildMetrics(m) {
|
2018-12-22 03:26:07 +08:00
|
|
|
err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
2017-09-30 07:13:08 +08:00
|
|
|
if err != nil {
|
2020-11-14 06:08:05 +08:00
|
|
|
if isRetryable(err) {
|
2021-12-15 06:07:10 +08:00
|
|
|
if flushErr := w.sender.Flush(); flushErr != nil {
|
|
|
|
|
w.Log.Errorf("wavefront flushing error: %v", flushErr)
|
|
|
|
|
}
|
2021-12-15 06:43:37 +08:00
|
|
|
return fmt.Errorf("wavefront sending error: %v", err)
|
2020-11-14 06:08:05 +08:00
|
|
|
}
|
|
|
|
|
w.Log.Errorf("non-retryable error during Wavefront.Write: %v", err)
|
2020-11-16 22:54:58 +08:00
|
|
|
w.Log.Debugf("Non-retryable metric data: Name: %v, Value: %v, Timestamp: %v, Source: %v, PointTags: %v ", point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2020-11-03 12:12:48 +08:00
|
|
|
if w.ImmediateFlush {
|
|
|
|
|
w.Log.Debugf("Flushing batch of %d points", len(metrics))
|
|
|
|
|
return w.sender.Flush()
|
|
|
|
|
}
|
2017-09-30 07:13:08 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
func (w *Wavefront) buildMetrics(m telegraf.Metric) []*MetricPoint {
|
2022-08-02 05:27:56 +08:00
|
|
|
ret := make([]*MetricPoint, 0)
|
2017-09-30 07:13:08 +08:00
|
|
|
|
|
|
|
|
for fieldName, value := range m.Fields() {
|
|
|
|
|
var name string
|
|
|
|
|
if !w.SimpleFields && fieldName == "value" {
|
|
|
|
|
name = fmt.Sprintf("%s%s", w.Prefix, m.Name())
|
|
|
|
|
} else {
|
|
|
|
|
name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if w.UseRegex {
|
|
|
|
|
name = sanitizedRegex.ReplaceAllLiteralString(name, "-")
|
2019-04-03 02:47:25 +08:00
|
|
|
} else if w.UseStrict {
|
|
|
|
|
name = strictSanitizedChars.Replace(name)
|
2017-09-30 07:13:08 +08:00
|
|
|
} else {
|
|
|
|
|
name = sanitizedChars.Replace(name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if w.ConvertPaths {
|
|
|
|
|
name = pathReplacer.Replace(name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metric := &MetricPoint{
|
|
|
|
|
Metric: name,
|
2018-03-28 08:30:51 +08:00
|
|
|
Timestamp: m.Time().Unix(),
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metricValue, buildError := buildValue(value, metric.Metric, w)
|
|
|
|
|
if buildError != nil {
|
2020-08-27 00:58:28 +08:00
|
|
|
w.Log.Debugf("Error building tags: %s\n", buildError.Error())
|
2017-09-30 07:13:08 +08:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
metric.Value = metricValue
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
source, tags := w.buildTags(m.Tags())
|
2017-09-30 07:13:08 +08:00
|
|
|
metric.Source = source
|
|
|
|
|
metric.Tags = tags
|
|
|
|
|
|
|
|
|
|
ret = append(ret, metric)
|
|
|
|
|
}
|
|
|
|
|
return ret
|
|
|
|
|
}
|
|
|
|
|
|
2020-05-14 03:02:39 +08:00
|
|
|
func (w *Wavefront) buildTags(mTags map[string]string) (string, map[string]string) {
|
2018-06-12 05:54:08 +08:00
|
|
|
// Remove all empty tags.
|
|
|
|
|
for k, v := range mTags {
|
|
|
|
|
if v == "" {
|
|
|
|
|
delete(mTags, k)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
// find source, use source_override property if needed
|
2017-09-30 07:13:08 +08:00
|
|
|
var source string
|
2018-08-14 07:37:06 +08:00
|
|
|
if s, ok := mTags["source"]; ok {
|
|
|
|
|
source = s
|
|
|
|
|
delete(mTags, "source")
|
|
|
|
|
} else {
|
|
|
|
|
sourceTagFound := false
|
|
|
|
|
for _, s := range w.SourceOverride {
|
|
|
|
|
for k, v := range mTags {
|
|
|
|
|
if k == s {
|
|
|
|
|
source = v
|
2022-05-12 23:46:28 +08:00
|
|
|
if mTags["host"] != "" {
|
|
|
|
|
mTags["telegraf_host"] = mTags["host"]
|
|
|
|
|
}
|
|
|
|
|
|
2018-08-14 07:37:06 +08:00
|
|
|
sourceTagFound = true
|
|
|
|
|
delete(mTags, k)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if sourceTagFound {
|
2017-09-30 07:13:08 +08:00
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
2018-08-14 07:37:06 +08:00
|
|
|
|
|
|
|
|
if !sourceTagFound {
|
|
|
|
|
source = mTags["host"]
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
}
|
2018-12-22 03:26:07 +08:00
|
|
|
source = tagValueReplacer.Replace(source)
|
2017-09-30 07:13:08 +08:00
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
// remove default host tag
|
2017-09-30 07:13:08 +08:00
|
|
|
delete(mTags, "host")
|
|
|
|
|
|
2018-12-22 03:26:07 +08:00
|
|
|
// sanitize tag keys and values
|
|
|
|
|
tags := make(map[string]string)
|
|
|
|
|
for k, v := range mTags {
|
|
|
|
|
var key string
|
|
|
|
|
if w.UseRegex {
|
|
|
|
|
key = sanitizedRegex.ReplaceAllLiteralString(k, "-")
|
2019-04-03 02:47:25 +08:00
|
|
|
} else if w.UseStrict {
|
|
|
|
|
key = strictSanitizedChars.Replace(k)
|
2018-12-22 03:26:07 +08:00
|
|
|
} else {
|
|
|
|
|
key = sanitizedChars.Replace(k)
|
|
|
|
|
}
|
|
|
|
|
val := tagValueReplacer.Replace(v)
|
2020-05-14 03:02:39 +08:00
|
|
|
if w.TruncateTags {
|
|
|
|
|
if len(key) > maxTagLength {
|
|
|
|
|
w.Log.Warnf("Tag key length > 254. Skipping tag: %s", key)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if len(key)+len(val) > maxTagLength {
|
|
|
|
|
w.Log.Debugf("Key+value length > 254: %s", key)
|
|
|
|
|
val = val[:maxTagLength-len(key)]
|
|
|
|
|
}
|
|
|
|
|
}
|
2018-12-22 03:26:07 +08:00
|
|
|
tags[key] = val
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return source, tags
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func buildValue(v interface{}, name string, w *Wavefront) (float64, error) {
|
|
|
|
|
switch p := v.(type) {
|
|
|
|
|
case bool:
|
|
|
|
|
if w.ConvertBool {
|
|
|
|
|
if p {
|
|
|
|
|
return 1, nil
|
|
|
|
|
}
|
2021-02-09 00:18:40 +08:00
|
|
|
return 0, nil
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
case int64:
|
|
|
|
|
return float64(v.(int64)), nil
|
|
|
|
|
case uint64:
|
|
|
|
|
return float64(v.(uint64)), nil
|
|
|
|
|
case float64:
|
|
|
|
|
return v.(float64), nil
|
|
|
|
|
case string:
|
|
|
|
|
for prefix, mappings := range w.StringToNumber {
|
|
|
|
|
if strings.HasPrefix(name, prefix) {
|
|
|
|
|
for _, mapping := range mappings {
|
2021-03-24 23:27:46 +08:00
|
|
|
val, hasVal := mapping[p]
|
2017-09-30 07:13:08 +08:00
|
|
|
if hasVal {
|
|
|
|
|
return val, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
|
|
|
default:
|
|
|
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
|
|
|
}
|
|
|
|
|
return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *Wavefront) Close() error {
|
2018-12-22 03:26:07 +08:00
|
|
|
w.sender.Close()
|
2017-09-30 07:13:08 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
outputs.Add("wavefront", func() telegraf.Output {
|
|
|
|
|
return &Wavefront{
|
2022-06-30 03:27:56 +08:00
|
|
|
Token: "DUMMY_TOKEN",
|
|
|
|
|
MetricSeparator: ".",
|
|
|
|
|
ConvertPaths: true,
|
|
|
|
|
ConvertBool: true,
|
|
|
|
|
TruncateTags: false,
|
|
|
|
|
ImmediateFlush: true,
|
|
|
|
|
HTTPMaximumBatchSize: 10000,
|
2017-09-30 07:13:08 +08:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
2020-11-14 06:08:05 +08:00
|
|
|
|
|
|
|
|
// TODO: Currently there's no canonical way to exhaust all
|
|
|
|
|
// retryable/non-retryable errors from wavefront, so this implementation just
|
|
|
|
|
// handles known non-retryable errors in a case-by-case basis and assumes all
|
|
|
|
|
// other errors are retryable.
|
|
|
|
|
// A support ticket has been filed against wavefront to provide a canonical way
|
|
|
|
|
// to distinguish between retryable and non-retryable errors (link is not
|
|
|
|
|
// public).
|
|
|
|
|
func isRetryable(err error) bool {
|
|
|
|
|
if err != nil {
|
|
|
|
|
// "empty metric name" errors are non-retryable as retry will just keep
|
|
|
|
|
// getting the same error again and again.
|
|
|
|
|
if strings.Contains(err.Error(), "empty metric name") {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|