added force_document_id option to ES output enable resend data and avoiding duplicated ES documents, fix #7891 (#8019)
This commit is contained in:
parent
8c28661ca7
commit
9a06ac1d6a
|
|
@ -512,7 +512,9 @@
|
||||||
# template_name = "telegraf"
|
# template_name = "telegraf"
|
||||||
# ## Set to true if you want telegraf to overwrite an existing template
|
# ## Set to true if you want telegraf to overwrite an existing template
|
||||||
# overwrite_template = false
|
# overwrite_template = false
|
||||||
|
# ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
|
||||||
|
# ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
|
||||||
|
# force_document_id = false
|
||||||
|
|
||||||
# # Send metrics to command as input over stdin
|
# # Send metrics to command as input over stdin
|
||||||
# [[outputs.exec]]
|
# [[outputs.exec]]
|
||||||
|
|
|
||||||
|
|
@ -196,6 +196,9 @@ This plugin will format the events in the following way:
|
||||||
template_name = "telegraf"
|
template_name = "telegraf"
|
||||||
## Set to true if you want telegraf to overwrite an existing template
|
## Set to true if you want telegraf to overwrite an existing template
|
||||||
overwrite_template = false
|
overwrite_template = false
|
||||||
|
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
|
||||||
|
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
|
||||||
|
force_document_id = false
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Permissions
|
#### Permissions
|
||||||
|
|
@ -232,6 +235,7 @@ Additionally, you can specify dynamic index names by using tags with the notatio
|
||||||
* `manage_template`: Set to true if you want telegraf to manage its index template. If enabled it will create a recommended index template for telegraf indexes.
|
* `manage_template`: Set to true if you want telegraf to manage its index template. If enabled it will create a recommended index template for telegraf indexes.
|
||||||
* `template_name`: The template name used for telegraf indexes.
|
* `template_name`: The template name used for telegraf indexes.
|
||||||
* `overwrite_template`: Set to true if you want telegraf to overwrite an existing template.
|
* `overwrite_template`: Set to true if you want telegraf to overwrite an existing template.
|
||||||
|
* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents.
|
||||||
|
|
||||||
### Known issues
|
### Known issues
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,8 @@ import (
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"crypto/sha256"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
|
@ -31,6 +33,7 @@ type Elasticsearch struct {
|
||||||
ManageTemplate bool
|
ManageTemplate bool
|
||||||
TemplateName string
|
TemplateName string
|
||||||
OverwriteTemplate bool
|
OverwriteTemplate bool
|
||||||
|
ForceDocumentId bool
|
||||||
MajorReleaseNumber int
|
MajorReleaseNumber int
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
|
|
@ -86,6 +89,9 @@ var sampleConfig = `
|
||||||
template_name = "telegraf"
|
template_name = "telegraf"
|
||||||
## Set to true if you want telegraf to overwrite an existing template
|
## Set to true if you want telegraf to overwrite an existing template
|
||||||
overwrite_template = false
|
overwrite_template = false
|
||||||
|
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
|
||||||
|
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
|
||||||
|
force_document_id = false
|
||||||
`
|
`
|
||||||
|
|
||||||
const telegrafTemplate = `
|
const telegrafTemplate = `
|
||||||
|
|
@ -242,6 +248,19 @@ func (a *Elasticsearch) Connect() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPointID generates a unique ID for a Metric Point
|
||||||
|
func GetPointID(m telegraf.Metric) string {
|
||||||
|
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
//Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID
|
||||||
|
|
||||||
|
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
|
||||||
|
buffer.WriteString(m.Name())
|
||||||
|
buffer.WriteString(strconv.FormatUint(m.HashID(), 10))
|
||||||
|
|
||||||
|
return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||||
if len(metrics) == 0 {
|
if len(metrics) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -265,6 +284,11 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m)
|
br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m)
|
||||||
|
|
||||||
|
if a.ForceDocumentId {
|
||||||
|
id := GetPointID(metric)
|
||||||
|
br.Id(id)
|
||||||
|
}
|
||||||
|
|
||||||
if a.MajorReleaseNumber <= 6 {
|
if a.MajorReleaseNumber <= 6 {
|
||||||
br.Type("metrics")
|
br.Type("metrics")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue