diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 1df63db05..c0fe636f4 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -512,7 +512,9 @@ # template_name = "telegraf" # ## Set to true if you want telegraf to overwrite an existing template # overwrite_template = false - +# ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string +# ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's +# force_document_id = false # # Send metrics to command as input over stdin # [[outputs.exec]] diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index cf8c4d9ca..2616ff1a6 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -196,6 +196,9 @@ This plugin will format the events in the following way: template_name = "telegraf" ## Set to true if you want telegraf to overwrite an existing template overwrite_template = false + ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string + ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's + force_document_id = false ``` #### Permissions @@ -232,6 +235,7 @@ Additionally, you can specify dynamic index names by using tags with the notatio * `manage_template`: Set to true if you want telegraf to manage its index template. If enabled it will create a recommended index template for telegraf indexes. * `template_name`: The template name used for telegraf indexes. * `overwrite_template`: Set to true if you want telegraf to overwrite an existing template. +* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents. ### Known issues diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 7b0c1c125..b17a945b3 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -11,6 +11,8 @@ import ( "text/template" "time" + "crypto/sha256" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" @@ -31,6 +33,7 @@ type Elasticsearch struct { ManageTemplate bool TemplateName string OverwriteTemplate bool + ForceDocumentId bool MajorReleaseNumber int tls.ClientConfig @@ -86,6 +89,9 @@ var sampleConfig = ` template_name = "telegraf" ## Set to true if you want telegraf to overwrite an existing template overwrite_template = false + ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string + ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's + force_document_id = false ` const telegrafTemplate = ` @@ -242,6 +248,19 @@ func (a *Elasticsearch) Connect() error { return nil } +// GetPointID generates a unique ID for a Metric Point +func GetPointID(m telegraf.Metric) string { + + var buffer bytes.Buffer + //Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID + + buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10)) + buffer.WriteString(m.Name()) + buffer.WriteString(strconv.FormatUint(m.HashID(), 10)) + + return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes())) +} + func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil @@ -265,6 +284,11 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m) + if a.ForceDocumentId { + id := GetPointID(metric) + br.Id(id) + } + if a.MajorReleaseNumber <= 6 { br.Type("metrics") }