fix(outputs.wavefront): update wavefront sdk and use non-deprecated APIs (#11560)

This commit is contained in:
Luke Winikates 2022-08-01 14:27:56 -07:00 committed by GitHub
parent 9f57f9408c
commit f0aad2f8d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 40 deletions

2
go.mod
View File

@ -146,7 +146,7 @@ require (
github.com/vapourismo/knx-go v0.0.0-20211128234507-8198fa17db36 github.com/vapourismo/knx-go v0.0.0-20211128234507-8198fa17db36
github.com/vjeantet/grok v1.0.1 github.com/vjeantet/grok v1.0.1
github.com/vmware/govmomi v0.29.0 github.com/vmware/govmomi v0.29.0
github.com/wavefronthq/wavefront-sdk-go v0.9.11 github.com/wavefronthq/wavefront-sdk-go v0.10.1
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf
github.com/xdg/scram v1.0.5 github.com/xdg/scram v1.0.5
github.com/yuin/goldmark v1.4.1 github.com/yuin/goldmark v1.4.1

4
go.sum
View File

@ -2267,8 +2267,8 @@ github.com/vjeantet/grok v1.0.1/go.mod h1:ax1aAchzC6/QMXMcyzHQGZWaW1l195+uMYIkCW
github.com/vmware/govmomi v0.29.0 h1:SHJQ7DUc4fltFZv16znJNGHR1/XhiDK5iKxm2OqwkuU= github.com/vmware/govmomi v0.29.0 h1:SHJQ7DUc4fltFZv16znJNGHR1/XhiDK5iKxm2OqwkuU=
github.com/vmware/govmomi v0.29.0/go.mod h1:F7adsVewLNHsW/IIm7ziFURaXDaHEwcc+ym4r3INMdY= github.com/vmware/govmomi v0.29.0/go.mod h1:F7adsVewLNHsW/IIm7ziFURaXDaHEwcc+ym4r3INMdY=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM= github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/wavefronthq/wavefront-sdk-go v0.9.11 h1:3qv/yyNNyLKPQftQWFrfHGUv50e/gMxKlUQnILlvHKw= github.com/wavefronthq/wavefront-sdk-go v0.10.1 h1:my6q/yDaSi4bADHr9006YrHBhTdjEtpU0UpT0NDaSw8=
github.com/wavefronthq/wavefront-sdk-go v0.9.11/go.mod h1:AcW8zJJcYodB7B9KYzoxVH6K0fmYd6MgpmXE1LMo+OU= github.com/wavefronthq/wavefront-sdk-go v0.10.1/go.mod h1:JHflWDtew1icwZB6OEvHedBFD9T1h7Se/m+dBSjywu8=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=

View File

@ -1,27 +1,25 @@
# Wavefront Output Plugin # Wavefront Output Plugin
This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in This plugin writes to a [Wavefront](https://www.wavefront.com) instance or a
Wavefront data format over TCP. Wavefront Proxy instance over HTTP or HTTPS.
## Configuration ## Configuration
```toml @sample.conf ```toml @sample.conf
# Configuration for Wavefront server to send metrics to
[[outputs.wavefront]] [[outputs.wavefront]]
## Url for Wavefront Direct Ingestion. For Wavefront Proxy Ingestion, see ## Url for Wavefront API or Wavefront Proxy instance.
## the 'host' and 'port' options below.
url = "https://metrics.wavefront.com" url = "https://metrics.wavefront.com"
## Authentication Token for Wavefront. Only required if using Direct Ingestion ## Authentication Token for Wavefront. Required if using Direct Ingestion. Not required if using a Wavefront Proxy.
#token = "DUMMY_TOKEN" #token = "DUMMY_TOKEN"
## Maximum number of metrics to send per batch for Direct Ingestion. Ignored unless 'url' is set. This value should be higher than the `metric_batch_size`. Default is 10,000. Values higher than 40,000 are not recommended. ## Maximum number of metrics to send per HTTP request. This value should be higher than the `metric_batch_size`. Default is 10,000. Values higher than 40,000 are not recommended.
# http_maximum_batch_size = 10000 # http_maximum_batch_size = 10000
## DNS name of the wavefront proxy server. Do not use if url is specified ## Deprecated. DNS name of the Wavefront server or Wavefront Proxy. Use the `url` field instead.
#host = "wavefront.example.com" #host = "wavefront.example.com"
## Port that the Wavefront proxy server listens on. Do not use if url is specified ## Deprecated. Wavefront proxy port. Use the `url` field instead.
#port = 2878 #port = 2878
## prefix for metrics keys ## prefix for metrics keys

View File

@ -1,19 +1,17 @@
# Configuration for Wavefront server to send metrics to
[[outputs.wavefront]] [[outputs.wavefront]]
## Url for Wavefront Direct Ingestion. For Wavefront Proxy Ingestion, see ## Url for Wavefront API or Wavefront Proxy instance.
## the 'host' and 'port' options below.
url = "https://metrics.wavefront.com" url = "https://metrics.wavefront.com"
## Authentication Token for Wavefront. Only required if using Direct Ingestion ## Authentication Token for Wavefront. Required if using Direct Ingestion. Not required if using a Wavefront Proxy.
#token = "DUMMY_TOKEN" #token = "DUMMY_TOKEN"
## Maximum number of metrics to send per batch for Direct Ingestion. Ignored unless 'url' is set. This value should be higher than the `metric_batch_size`. Default is 10,000. Values higher than 40,000 are not recommended. ## Maximum number of metrics to send per HTTP request. This value should be higher than the `metric_batch_size`. Default is 10,000. Values higher than 40,000 are not recommended.
# http_maximum_batch_size = 10000 # http_maximum_batch_size = 10000
## DNS name of the wavefront proxy server. Do not use if url is specified ## Deprecated. DNS name of the Wavefront server or Wavefront Proxy. Use the `url` field instead.
#host = "wavefront.example.com" #host = "wavefront.example.com"
## Port that the Wavefront proxy server listens on. Do not use if url is specified ## Deprecated. Wavefront proxy port. Use the `url` field instead.
#port = 2878 #port = 2878
## prefix for metrics keys ## prefix for metrics keys

View File

@ -4,6 +4,7 @@ package wavefront
import ( import (
_ "embed" _ "embed"
"fmt" "fmt"
"net/url"
"regexp" "regexp"
"strings" "strings"
@ -22,8 +23,8 @@ const maxTagLength = 254
type Wavefront struct { type Wavefront struct {
URL string `toml:"url"` URL string `toml:"url"`
Token string `toml:"token"` Token string `toml:"token"`
Host string `toml:"host"` Host string `toml:"host" deprecated:"2.4.0;use url instead"`
Port int `toml:"port"` Port int `toml:"port" deprecated:"2.4.0;use url instead"`
Prefix string `toml:"prefix"` Prefix string `toml:"prefix"`
SimpleFields bool `toml:"simple_fields"` SimpleFields bool `toml:"simple_fields"`
MetricSeparator string `toml:"metric_separator"` MetricSeparator string `toml:"metric_separator"`
@ -77,36 +78,50 @@ func (*Wavefront) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func senderURLFromURLAndToken(rawURL, token string) (string, error) {
newURL, err := url.Parse(rawURL)
if err != nil {
return "", fmt.Errorf("could not parse the provided Url: %s", rawURL)
}
newURL.User = url.User(token)
return newURL.String(), nil
}
func senderURLFromHostAndPort(host string, port int) string {
return fmt.Sprintf("http://%s:%d", host, port)
}
func (w *Wavefront) Connect() error { func (w *Wavefront) Connect() error {
flushSeconds := 5 flushSeconds := 5
if w.ImmediateFlush { if w.ImmediateFlush {
flushSeconds = 86400 // Set a very long flush interval if we're flushing directly flushSeconds = 86400 // Set a very long flush interval if we're flushing directly
} }
var connectionURL string
if w.URL != "" { if w.URL != "" {
w.Log.Debug("connecting over http/https using Url: %s", w.URL) w.Log.Debug("connecting over http/https using Url: %s", w.URL)
sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{ connectionURLWithToken, err := senderURLFromURLAndToken(w.URL, w.Token)
Server: w.URL,
Token: w.Token,
FlushIntervalSeconds: flushSeconds,
BatchSize: w.HTTPMaximumBatchSize,
})
if err != nil { if err != nil {
return fmt.Errorf("could not create Wavefront Sender for Url: %s", w.URL) return err
} }
w.sender = sender connectionURL = connectionURLWithToken
} else { } else {
w.Log.Debugf("connecting over tcp using Host: %q and Port: %d", w.Host, w.Port) w.Log.Warnf("configuration with host/port is deprecated. Please use url.")
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{ w.Log.Debugf("connecting over http using Host: %q and Port: %d", w.Host, w.Port)
Host: w.Host, connectionURL = senderURLFromHostAndPort(w.Host, w.Port)
MetricsPort: w.Port,
FlushIntervalSeconds: flushSeconds,
})
if err != nil {
return fmt.Errorf("could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
}
w.sender = sender
} }
sender, err := wavefront.NewSender(connectionURL,
wavefront.BatchSize(w.HTTPMaximumBatchSize),
wavefront.FlushIntervalSeconds(flushSeconds),
)
if err != nil {
return fmt.Errorf("could not create Wavefront Sender for the provided url")
}
w.sender = sender
if w.ConvertPaths && w.MetricSeparator == "_" { if w.ConvertPaths && w.MetricSeparator == "_" {
w.ConvertPaths = false w.ConvertPaths = false
} }
@ -140,7 +155,7 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
} }
func (w *Wavefront) buildMetrics(m telegraf.Metric) []*MetricPoint { func (w *Wavefront) buildMetrics(m telegraf.Metric) []*MetricPoint {
ret := []*MetricPoint{} ret := make([]*MetricPoint, 0)
for fieldName, value := range m.Fields() { for fieldName, value := range m.Fields() {
var name string var name string

View File

@ -355,6 +355,17 @@ func TestTagLimits(t *testing.T) {
require.Equal(t, longKey, tags[longKey]) require.Equal(t, longKey, tags[longKey])
} }
func TestSenderURLFromHostAndPort(t *testing.T) {
require.Equal(t, "http://localhost:2878", senderURLFromHostAndPort("localhost", 2878))
}
func TestSenderURLFromURLAndToken(t *testing.T) {
url, err := senderURLFromURLAndToken("https://surf.wavefront.com", "11111111-2222-3333-4444-555555555555")
require.Nil(t, err)
require.Equal(t, "https://11111111-2222-3333-4444-555555555555@surf.wavefront.com",
url)
}
func TestDefaults(t *testing.T) { func TestDefaults(t *testing.T) {
defaultWavefront := outputs.Outputs["wavefront"]().(*Wavefront) defaultWavefront := outputs.Outputs["wavefront"]().(*Wavefront)
require.Equal(t, 10000, defaultWavefront.HTTPMaximumBatchSize) require.Equal(t, 10000, defaultWavefront.HTTPMaximumBatchSize)