From a8355c74b976389aa22e347cabab56a3b99d6773 Mon Sep 17 00:00:00 2001 From: cnemo-cenic <117843978+cnemo-cenic@users.noreply.github.com> Date: Tue, 9 Jul 2024 12:09:02 -0700 Subject: [PATCH] feat(outputs.elasticsearch): Support data streams (#15613) --- plugins/outputs/elasticsearch/README.md | 7 +++ .../outputs/elasticsearch/elasticsearch.go | 5 +++ .../elasticsearch/elasticsearch_test.go | 44 +++++++++++++++++++ plugins/outputs/elasticsearch/sample.conf | 4 ++ 4 files changed, 60 insertions(+) diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index 01cba2b2e..6b1e5a591 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -286,6 +286,10 @@ to use them. # default_tag_value = "none" index_name = "telegraf-%Y.%m.%d" # required. + ## Optional Index Config + ## Set to true if Telegraf should use the "create" OpType while indexing + # use_optype_create = false + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" @@ -397,6 +401,9 @@ the `default_tag_value` will be used instead. `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value. +* `use_optype_create`: If set, the "create" operation type will be used when + indexing into Elasticsearch, which is needed when using the Elasticsearch + data streams feature. * `use_pipeline`: If set, the set value will be used as the pipeline to call when sending events to elasticsearch. Additionally, you can specify dynamic pipeline names by using tags with the notation ```{{tag_name}}```. If the tag diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 5920707ec..a3e0ea5ef 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -43,6 +43,7 @@ type Elasticsearch struct { IndexTemplate map[string]interface{} `toml:"template_index_settings"` ManageTemplate bool `toml:"manage_template"` OverwriteTemplate bool `toml:"overwrite_template"` + UseOpTypeCreate bool `toml:"use_optype_create"` Username config.Secret `toml:"username"` Password config.Secret `toml:"password"` TemplateName string `toml:"template_name"` @@ -304,6 +305,10 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m) + if a.UseOpTypeCreate { + br.OpType("create") + } + if a.ForceDocumentID { id := GetPointID(metric) br.Id(id) diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index c41193c02..f7d39fea9 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -298,6 +298,50 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) { require.Error(t, err) } +func TestUseOpTypeCreate(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: config.Duration(time.Second * 5), + EnableGzip: true, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + UseOpTypeCreate: true, + Log: testutil.Logger{}, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + metrics := []telegraf.Metric{ + testutil.TestMetric(1), + } + + err := e.Connect() + require.NoError(t, err) + + err = e.manageTemplate(ctx) + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.NoError(t, err) + } +} + func TestTemplateManagementIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") diff --git a/plugins/outputs/elasticsearch/sample.conf b/plugins/outputs/elasticsearch/sample.conf index b4eea3242..9bd3962d6 100644 --- a/plugins/outputs/elasticsearch/sample.conf +++ b/plugins/outputs/elasticsearch/sample.conf @@ -40,6 +40,10 @@ # default_tag_value = "none" index_name = "telegraf-%Y.%m.%d" # required. + ## Optional Index Config + ## Set to true if Telegraf should use the "create" OpType while indexing + # use_optype_create = false + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem"