Dynatrace output plugin (#7881)

This commit is contained in:
Thomas Schuetz 2020-08-12 17:51:53 +02:00 committed by GitHub
parent 78811f7b1c
commit 780fbfecb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 631 additions and 0 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/.idea
/build
/telegraf
/telegraf.exe

View File

@ -10,6 +10,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
_ "github.com/influxdata/telegraf/plugins/outputs/dynatrace"
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/outputs/exec"
_ "github.com/influxdata/telegraf/plugins/outputs/execd"

View File

@ -0,0 +1,24 @@
# Dynatrace Output Plugin
This plugin writes telegraf metrics to a Dynatrace environment.
An API token is necessary, which can be obtained in your Dynatrace environment. Navigate to **Dynatrace > Settings > Integration > Dynatrace API** and create a new token with
'Data ingest' access scope enabled.
Telegraf measurements which can't be converted to a float64 are skipped.
Metrics fields are added to the measurement name by using '.' in the metric name.
### Configuration
```toml
[[outputs.dynatrace]]
## Dynatrace environment URL (e.g.: https://YOUR_DOMAIN/api/v2/metrics/ingest) or use the local ingest endpoint of your OneAgent monitored host (e.g.: http://127.0.0.1:14499/metrics/ingest).
environmentURL = ""
environmentApiToken = ""
## Optional prefix for metric names (e.g.: "telegraf.")
prefix = "telegraf."
## Flag for skipping the tls certificate check, just for testing purposes, should be false by default
skipCertificateCheck = false
```

View File

@ -0,0 +1,274 @@
package dynatrace
import (
"bytes"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"io/ioutil"
"math"
"net/http"
"regexp"
"sort"
"strconv"
"strings"
"time"
)
const (
oneAgentMetricsUrl = "http://127.0.0.1:14499/metrics/ingest"
)
var (
reNameAllowedCharList = regexp.MustCompile("[^A-Za-z0-9.]+")
maxDimKeyLen = 100
maxMetricKeyLen = 250
)
// Dynatrace Configuration for the Dynatrace output plugin
type Dynatrace struct {
URL string `toml:"url"`
APIToken string `toml:"api_token"`
InsecureSkipVerify bool `toml:"insecure_skip_verify"`
Prefix string `toml:"prefix"`
Log telegraf.Logger `toml:"-"`
Timeout internal.Duration `toml:"timeout"`
tls.ClientConfig
client *http.Client
}
const sampleConfig = `
## For usage with the Dynatrace OneAgent you can omit any configuration,
## the only requirement is that the OneAgent is running on the same host.
## Only setup environment url and token if you want to monitor a Host without the OneAgent present.
##
## Your Dynatrace environment URL.
## For Dynatrace OneAgent you can leave this empty or set it to "http://127.0.0.1:14499/metrics/ingest" (default)
## For Dynatrace SaaS environments the URL scheme is "https://{your-environment-id}.live.dynatrace.com/api/v2/metrics/ingest"
## For Dynatrace Managed environments the URL scheme is "https://{your-domain}/e/{your-environment-id}/api/v2/metrics/ingest"
url = ""
## Your Dynatrace API token.
## Create an API token within your Dynatrace environment, by navigating to Settings > Integration > Dynatrace API
## The API token needs data ingest scope permission. When using OneAgent, no API token is required.
api_token = ""
## Optional prefix for metric names (e.g.: "telegraf.")
prefix = "telegraf."
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Optional flag for ignoring tls certificate check
# insecure_skip_verify = false
## Connection timeout, defaults to "5s" if not set.
timeout = "5s"
`
// Connect Connects the Dynatrace output plugin to the Telegraf stream
func (d *Dynatrace) Connect() error {
return nil
}
// Close Closes the Dynatrace output plugin
func (d *Dynatrace) Close() error {
d.client = nil
return nil
}
// SampleConfig Returns a sample configuration for the Dynatrace output plugin
func (d *Dynatrace) SampleConfig() string {
return sampleConfig
}
// Description returns the description for the Dynatrace output plugin
func (d *Dynatrace) Description() string {
return "Send telegraf metrics to a Dynatrace environment"
}
// Normalizes a metric keys or metric dimension identifiers
// according to Dynatrace format.
func (d *Dynatrace) normalize(s string, max int) (string, error) {
s = reNameAllowedCharList.ReplaceAllString(s, "_")
// Strip Digits and underscores if they are at the beginning of the string
normalizedString := strings.TrimLeft(s, "_0123456789")
for strings.HasPrefix(normalizedString, "_") {
normalizedString = normalizedString[1:]
}
if len(normalizedString) > max {
normalizedString = normalizedString[:max]
}
for strings.HasSuffix(normalizedString, "_") {
normalizedString = normalizedString[:len(normalizedString)-1]
}
if len(normalizedString) == 0 {
return "", fmt.Errorf("error normalizing the string: %s", s)
}
return normalizedString, nil
}
func (d *Dynatrace) escape(v string) string {
return strconv.Quote(v)
}
func (d *Dynatrace) Write(metrics []telegraf.Metric) error {
var buf bytes.Buffer
var tagb bytes.Buffer
if len(metrics) == 0 {
return nil
}
for _, metric := range metrics {
// first write the tags into a buffer
tagb.Reset()
if len(metric.Tags()) > 0 {
keys := make([]string, 0, len(metric.Tags()))
for k := range metric.Tags() {
keys = append(keys, k)
}
// sort tag keys to expect the same order in ech run
sort.Strings(keys)
for _, k := range keys {
tagKey, err := d.normalize(k, maxDimKeyLen)
if err != nil {
continue
}
fmt.Fprintf(&tagb, ",%s=%s", strings.ToLower(tagKey), d.escape(metric.Tags()[k]))
}
}
if len(metric.Fields()) > 0 {
for k, v := range metric.Fields() {
var value string
switch v := v.(type) {
case string:
continue
case float64:
if !math.IsNaN(v) && !math.IsInf(v, 0) {
value = fmt.Sprintf("%f", v)
} else {
continue
}
case uint64:
value = strconv.FormatUint(v, 10)
case int64:
value = strconv.FormatInt(v, 10)
case bool:
if v {
value = "1"
} else {
value = "0"
}
default:
d.Log.Debugf("Dynatrace type not supported! %s", v)
continue
}
// metric name
metricKey, err := d.normalize(k, maxMetricKeyLen)
if err != nil {
continue
}
metricID, err := d.normalize(d.Prefix+metric.Name()+"."+metricKey, maxMetricKeyLen)
// write metric name combined with its field
if err != nil {
continue
}
fmt.Fprintf(&buf, "%s", metricID)
// add the tag string
fmt.Fprintf(&buf, "%s", tagb.String())
// write measured value
fmt.Fprintf(&buf, " %v\n", value)
}
}
}
return d.send(buf.Bytes())
}
func (d *Dynatrace) send(msg []byte) error {
var err error
req, err := http.NewRequest("POST", d.URL, bytes.NewBuffer(msg))
if err != nil {
d.Log.Errorf("Dynatrace error: %s", err.Error())
return fmt.Errorf("Dynatrace error while creating HTTP request:, %s", err.Error())
}
req.Header.Add("Content-Type", "text/plain; charset=UTF-8")
if len(d.APIToken) != 0 {
req.Header.Add("Authorization", "Api-Token "+d.APIToken)
}
// add user-agent header to identify metric source
req.Header.Add("User-Agent", "telegraf")
resp, err := d.client.Do(req)
if err != nil {
d.Log.Errorf("Dynatrace error: %s", err.Error())
fmt.Println(req)
return fmt.Errorf("Dynatrace error while sending HTTP request:, %s", err.Error())
}
defer resp.Body.Close()
// print metric line results as info log
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted {
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
d.Log.Errorf("Dynatrace error reading response")
}
bodyString := string(bodyBytes)
d.Log.Debugf("Dynatrace returned: %s", bodyString)
} else {
return fmt.Errorf("Dynatrace request failed with response code:, %d", resp.StatusCode)
}
return nil
}
func (d *Dynatrace) Init() error {
if len(d.URL) == 0 {
d.Log.Infof("Dynatrace URL is empty, defaulting to OneAgent metrics interface")
d.URL = oneAgentMetricsUrl
}
if d.URL != oneAgentMetricsUrl && len(d.APIToken) == 0 {
d.Log.Errorf("Dynatrace api_token is a required field for Dynatrace output")
return fmt.Errorf("api_token is a required field for Dynatrace output")
}
tlsCfg, err := d.ClientConfig.TLSConfig()
if err != nil {
return err
}
d.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: d.Timeout.Duration,
}
return nil
}
func init() {
outputs.Add("dynatrace", func() telegraf.Output {
return &Dynatrace{
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}

View File

@ -0,0 +1,331 @@
package dynatrace
import (
"encoding/json"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestNilMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{
Timeout: internal.Duration{Duration: time.Second * 5},
}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
err = d.Write(nil)
require.NoError(t, err)
}
func TestEmptyMetricsSlice(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
empty := []telegraf.Metric{}
err = d.Write(empty)
require.NoError(t, err)
}
func TestMockURL(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
err = d.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestMissingURL(t *testing.T) {
d := &Dynatrace{}
d.Log = testutil.Logger{}
err := d.Init()
require.Equal(t, oneAgentMetricsUrl, d.URL)
err = d.Connect()
require.Equal(t, oneAgentMetricsUrl, d.URL)
require.NoError(t, err)
}
func TestMissingAPITokenMissingURL(t *testing.T) {
d := &Dynatrace{}
d.Log = testutil.Logger{}
err := d.Init()
require.Equal(t, oneAgentMetricsUrl, d.URL)
err = d.Connect()
require.Equal(t, oneAgentMetricsUrl, d.URL)
require.NoError(t, err)
}
func TestMissingAPIToken(t *testing.T) {
d := &Dynatrace{}
d.URL = "test"
d.Log = testutil.Logger{}
err := d.Init()
require.Error(t, err)
}
func TestSendMetric(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check the encoded result
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
require.NoError(t, err)
}
bodyString := string(bodyBytes)
expected := "mymeasurement.myfield,host=\"192.168.0.1\",nix=\"nix\" 3.140000\nmymeasurement.value,host=\"192.168.0.1\" 3.140000\n"
if bodyString != expected {
t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString)
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "nix": "nix"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
metrics := []telegraf.Metric{m1, m2}
err = d.Write(metrics)
require.NoError(t, err)
}
func TestSendSingleMetricWithUnorderedTags(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check the encoded result
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
require.NoError(t, err)
}
bodyString := string(bodyBytes)
expected := "mymeasurement.myfield,a=\"test\",b=\"test\",c=\"test\" 3.140000\n"
if bodyString != expected {
t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString)
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"a": "test", "c": "test", "b": "test"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
metrics := []telegraf.Metric{m1}
err = d.Write(metrics)
require.NoError(t, err)
}
func TestSendMetricWithoutTags(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// check the encoded result
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
require.NoError(t, err)
}
bodyString := string(bodyBytes)
expected := "mymeasurement.myfield 3.140000\n"
if bodyString != expected {
t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString)
}
json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
metrics := []telegraf.Metric{m1}
err = d.Write(metrics)
require.NoError(t, err)
}
func TestSendMetricWithUpperCaseTagKeys(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// check the encoded result
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
require.NoError(t, err)
}
bodyString := string(bodyBytes)
expected := "mymeasurement.myfield,aaa=\"test\",b_b=\"test\",ccc=\"test\" 3.140000\n"
if bodyString != expected {
t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString)
}
json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"AAA": "test", "CcC": "test", "B B": "test"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
metrics := []telegraf.Metric{m1}
err = d.Write(metrics)
require.NoError(t, err)
}
func TestSendBooleanMetricWithoutTags(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// check the encoded result
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
require.NoError(t, err)
}
bodyString := string(bodyBytes)
expected := "mymeasurement.myfield 1\n"
if bodyString != expected {
t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString)
}
json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`)
}))
defer ts.Close()
d := &Dynatrace{}
d.URL = ts.URL
d.APIToken = "123"
d.Log = testutil.Logger{}
err := d.Init()
require.NoError(t, err)
err = d.Connect()
require.NoError(t, err)
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{},
map[string]interface{}{"myfield": bool(true)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
metrics := []telegraf.Metric{m1}
err = d.Write(metrics)
require.NoError(t, err)
}