fix: Implement NaN and inf handling for elasticsearch output (#10196)

This commit is contained in:
Sven Rebhan 2021-12-08 18:56:54 +01:00 committed by GitHub
parent dbf55535c7
commit ec26975dec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 199 additions and 3 deletions

View File

@ -199,6 +199,15 @@ This plugin will format the events in the following way:
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
force_document_id = false
## Specifies the handling of NaN and Inf values.
## This option can have the following values:
## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered
## drop -- drop fields containing NaNs or infs
## replace -- replace with the value in "float_replacement_value" (default: 0.0)
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
```
### Permissions
@ -236,6 +245,8 @@ Additionally, you can specify dynamic index names by using tags with the notatio
* `template_name`: The template name used for telegraf indexes.
* `overwrite_template`: Set to true if you want telegraf to overwrite an existing template.
* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents.
* `float_handling`: Specifies how to handle `NaN` and infinite field values. `"none"` (default) will do nothing, `"drop"` will drop the field and `replace` will replace the field value by the number in `float_replacement_value`
* `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value.
## Known issues

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"net/http"
"strconv"
"strings"
@ -35,6 +36,8 @@ type Elasticsearch struct {
OverwriteTemplate bool
ForceDocumentID bool `toml:"force_document_id"`
MajorReleaseNumber int
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig
@ -95,6 +98,15 @@ var sampleConfig = `
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
force_document_id = false
## Specifies the handling of NaN and Inf values.
## This option can have the following values:
## none -- do not modify field-values (default); will produce an error if NaNs or infs are encountered
## drop -- drop fields containing NaNs or infs
## replace -- replace with the value in "float_replacement_value" (default: 0.0)
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
`
const telegrafTemplate = `
@ -177,6 +189,15 @@ func (a *Elasticsearch) Connect() error {
return fmt.Errorf("elasticsearch urls or index_name is not defined")
}
// Determine if we should process NaN and inf values
switch a.FloatHandling {
case "", "none":
a.FloatHandling = "none"
case "drop", "replace":
default:
return fmt.Errorf("invalid float_handling type %q", a.FloatHandling)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
defer cancel()
@ -278,12 +299,31 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())
// Handle NaN and inf field-values
fields := make(map[string]interface{})
for k, value := range metric.Fields() {
v, ok := value.(float64)
if !ok || a.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) {
fields[k] = value
continue
}
if a.FloatHandling == "drop" {
continue
}
if math.IsNaN(v) || math.IsInf(v, 1) {
fields[k] = a.FloatReplacement
} else {
fields[k] = -a.FloatReplacement
}
}
m := make(map[string]interface{})
m["@timestamp"] = metric.Time()
m["measurement_name"] = name
m["tag"] = metric.Tags()
m[name] = metric.Fields()
m[name] = fields
br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m)

View File

@ -2,12 +2,14 @@ package elasticsearch
import (
"context"
"math"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
@ -41,6 +43,149 @@ func TestConnectAndWriteIntegration(t *testing.T) {
require.NoError(t, err)
}
func TestConnectAndWriteMetricWithNaNValueEmpty(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.Error(t, err, "error sending bulk request to Elasticsearch: json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueNone(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
FloatHandling: "none",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.Error(t, err, "error sending bulk request to Elasticsearch: json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueDrop(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
FloatHandling: "drop",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.NoError(t, err)
}
}
func TestConnectAndWriteMetricWithNaNValueReplacement(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
FloatHandling: "3.1415",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.NoError(t, err)
}
}
func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -121,7 +266,7 @@ func TestGetTagKeys(t *testing.T) {
Log: testutil.Logger{},
}
var tests = []struct {
tests := []struct {
IndexName string
ExpectedIndexName string
ExpectedTagKeys []string
@ -181,7 +326,7 @@ func TestGetIndexName(t *testing.T) {
Log: testutil.Logger{},
}
var tests = []struct {
tests := []struct {
EventTime time.Time
Tags map[string]string
TagKeys []string