feat(outputs.elasticsearch): Allow custom template index settings (#15471)
This commit is contained in:
parent
28f6396348
commit
6eef8aa15c
|
|
@ -328,6 +328,16 @@ to use them.
|
||||||
# To pass custom HTTP headers please define it in a given below section
|
# To pass custom HTTP headers please define it in a given below section
|
||||||
# [outputs.elasticsearch.headers]
|
# [outputs.elasticsearch.headers]
|
||||||
# "X-Custom-Header" = "custom-value"
|
# "X-Custom-Header" = "custom-value"
|
||||||
|
|
||||||
|
## Template Index Settings
|
||||||
|
## Overrides the template settings.index section with any provided options.
|
||||||
|
## Defaults provided here in the config
|
||||||
|
# template_index_settings = {
|
||||||
|
# refresh_interval = "10s",
|
||||||
|
# mapping.total_fields.limit = 5000,
|
||||||
|
# auto_expand_replicas = "0-1",
|
||||||
|
# codec = "best_compression"
|
||||||
|
# }
|
||||||
```
|
```
|
||||||
|
|
||||||
### Permissions
|
### Permissions
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
|
@ -28,27 +29,28 @@ import (
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
type Elasticsearch struct {
|
type Elasticsearch struct {
|
||||||
AuthBearerToken config.Secret `toml:"auth_bearer_token"`
|
AuthBearerToken config.Secret `toml:"auth_bearer_token"`
|
||||||
DefaultPipeline string `toml:"default_pipeline"`
|
DefaultPipeline string `toml:"default_pipeline"`
|
||||||
DefaultTagValue string `toml:"default_tag_value"`
|
DefaultTagValue string `toml:"default_tag_value"`
|
||||||
EnableGzip bool `toml:"enable_gzip"`
|
EnableGzip bool `toml:"enable_gzip"`
|
||||||
EnableSniffer bool `toml:"enable_sniffer"`
|
EnableSniffer bool `toml:"enable_sniffer"`
|
||||||
FloatHandling string `toml:"float_handling"`
|
FloatHandling string `toml:"float_handling"`
|
||||||
FloatReplacement float64 `toml:"float_replacement_value"`
|
FloatReplacement float64 `toml:"float_replacement_value"`
|
||||||
ForceDocumentID bool `toml:"force_document_id"`
|
ForceDocumentID bool `toml:"force_document_id"`
|
||||||
HealthCheckInterval config.Duration `toml:"health_check_interval"`
|
HealthCheckInterval config.Duration `toml:"health_check_interval"`
|
||||||
HealthCheckTimeout config.Duration `toml:"health_check_timeout"`
|
HealthCheckTimeout config.Duration `toml:"health_check_timeout"`
|
||||||
IndexName string `toml:"index_name"`
|
IndexName string `toml:"index_name"`
|
||||||
ManageTemplate bool `toml:"manage_template"`
|
IndexTemplate map[string]interface{} `toml:"template_index_settings"`
|
||||||
OverwriteTemplate bool `toml:"overwrite_template"`
|
ManageTemplate bool `toml:"manage_template"`
|
||||||
Username config.Secret `toml:"username"`
|
OverwriteTemplate bool `toml:"overwrite_template"`
|
||||||
Password config.Secret `toml:"password"`
|
Username config.Secret `toml:"username"`
|
||||||
TemplateName string `toml:"template_name"`
|
Password config.Secret `toml:"password"`
|
||||||
Timeout config.Duration `toml:"timeout"`
|
TemplateName string `toml:"template_name"`
|
||||||
URLs []string `toml:"urls"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
UsePipeline string `toml:"use_pipeline"`
|
URLs []string `toml:"urls"`
|
||||||
Headers map[string]string `toml:"headers"`
|
UsePipeline string `toml:"use_pipeline"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
Headers map[string]string `toml:"headers"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
majorReleaseNumber int
|
majorReleaseNumber int
|
||||||
pipelineName string
|
pipelineName string
|
||||||
pipelineTagKeys []string
|
pipelineTagKeys []string
|
||||||
|
|
@ -66,12 +68,7 @@ const telegrafTemplate = `
|
||||||
"index_patterns" : [ "{{.TemplatePattern}}" ],
|
"index_patterns" : [ "{{.TemplatePattern}}" ],
|
||||||
{{ end }}
|
{{ end }}
|
||||||
"settings": {
|
"settings": {
|
||||||
"index": {
|
"index": {{.IndexTemplate}}
|
||||||
"refresh_interval": "10s",
|
|
||||||
"mapping.total_fields.limit": 5000,
|
|
||||||
"auto_expand_replicas" : "0-1",
|
|
||||||
"codec" : "best_compression"
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"mappings" : {
|
"mappings" : {
|
||||||
{{ if (lt .Version 7) }}
|
{{ if (lt .Version 7) }}
|
||||||
|
|
@ -128,9 +125,18 @@ const telegrafTemplate = `
|
||||||
}
|
}
|
||||||
}`
|
}`
|
||||||
|
|
||||||
|
const defaultTemplateIndexSettings = `
|
||||||
|
{
|
||||||
|
"refresh_interval": "10s",
|
||||||
|
"mapping.total_fields.limit": 5000,
|
||||||
|
"auto_expand_replicas": "0-1",
|
||||||
|
"codec": "best_compression"
|
||||||
|
}`
|
||||||
|
|
||||||
type templatePart struct {
|
type templatePart struct {
|
||||||
TemplatePattern string
|
TemplatePattern string
|
||||||
Version int
|
Version int
|
||||||
|
IndexTemplate string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*Elasticsearch) SampleConfig() string {
|
func (*Elasticsearch) SampleConfig() string {
|
||||||
|
|
@ -369,19 +375,12 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
|
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
|
||||||
tp := templatePart{
|
data, err := a.createNewTemplate(templatePattern)
|
||||||
TemplatePattern: templatePattern + "*",
|
if err != nil {
|
||||||
Version: a.majorReleaseNumber,
|
|
||||||
}
|
|
||||||
|
|
||||||
t := template.Must(template.New("template").Parse(telegrafTemplate))
|
|
||||||
var tmpl bytes.Buffer
|
|
||||||
|
|
||||||
if err := t.Execute(&tmpl, tp); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx)
|
|
||||||
|
|
||||||
|
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(data.String()).Do(ctx)
|
||||||
if errCreateTemplate != nil {
|
if errCreateTemplate != nil {
|
||||||
return fmt.Errorf("elasticsearch failed to create index template %s: %w", a.TemplateName, errCreateTemplate)
|
return fmt.Errorf("elasticsearch failed to create index template %s: %w", a.TemplateName, errCreateTemplate)
|
||||||
}
|
}
|
||||||
|
|
@ -393,6 +392,33 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Elasticsearch) createNewTemplate(templatePattern string) (*bytes.Buffer, error) {
|
||||||
|
var indexTemplate string
|
||||||
|
if a.IndexTemplate != nil {
|
||||||
|
data, err := json.Marshal(&a.IndexTemplate)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("elasticsearch failed to create index settings for template %s: %w", a.TemplateName, err)
|
||||||
|
}
|
||||||
|
indexTemplate = string(data)
|
||||||
|
} else {
|
||||||
|
indexTemplate = defaultTemplateIndexSettings
|
||||||
|
}
|
||||||
|
|
||||||
|
tp := templatePart{
|
||||||
|
TemplatePattern: templatePattern + "*",
|
||||||
|
Version: a.majorReleaseNumber,
|
||||||
|
IndexTemplate: indexTemplate,
|
||||||
|
}
|
||||||
|
|
||||||
|
t := template.Must(template.New("template").Parse(telegrafTemplate))
|
||||||
|
var tmpl bytes.Buffer
|
||||||
|
|
||||||
|
if err := t.Execute(&tmpl, tp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &tmpl, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
|
func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
|
||||||
tagKeys := []string{}
|
tagKeys := []string{}
|
||||||
startTag := strings.Index(indexName, "{{")
|
startTag := strings.Index(indexName, "{{")
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package elasticsearch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
@ -755,3 +756,51 @@ func TestAuthorizationHeaderWhenBearerTokenIsPresent(t *testing.T) {
|
||||||
err = e.Write(testutil.MockMetrics())
|
err = e.Write(testutil.MockMetrics())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStandardIndexSettings(t *testing.T) {
|
||||||
|
e := &Elasticsearch{
|
||||||
|
TemplateName: "test",
|
||||||
|
IndexName: "telegraf-%Y.%m.%d",
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
buf, err := e.createNewTemplate("test")
|
||||||
|
require.NoError(t, err)
|
||||||
|
var jsonData esTemplate
|
||||||
|
err = json.Unmarshal(buf.Bytes(), &jsonData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
index := jsonData.Settings.Index
|
||||||
|
require.Equal(t, "10s", index["refresh_interval"])
|
||||||
|
require.Equal(t, float64(5000), index["mapping.total_fields.limit"])
|
||||||
|
require.Equal(t, "0-1", index["auto_expand_replicas"])
|
||||||
|
require.Equal(t, "best_compression", index["codec"])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDifferentIndexSettings(t *testing.T) {
|
||||||
|
e := &Elasticsearch{
|
||||||
|
TemplateName: "test",
|
||||||
|
IndexName: "telegraf-%Y.%m.%d",
|
||||||
|
IndexTemplate: map[string]interface{}{
|
||||||
|
"refresh_interval": "20s",
|
||||||
|
"mapping.total_fields.limit": 1000,
|
||||||
|
"codec": "best_compression",
|
||||||
|
},
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
buf, err := e.createNewTemplate("test")
|
||||||
|
require.NoError(t, err)
|
||||||
|
var jsonData esTemplate
|
||||||
|
err = json.Unmarshal(buf.Bytes(), &jsonData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
index := jsonData.Settings.Index
|
||||||
|
require.Equal(t, "20s", index["refresh_interval"])
|
||||||
|
require.Equal(t, float64(1000), index["mapping.total_fields.limit"])
|
||||||
|
require.Equal(t, "best_compression", index["codec"])
|
||||||
|
}
|
||||||
|
|
||||||
|
type esTemplate struct {
|
||||||
|
Settings esSettings `json:"settings"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type esSettings struct {
|
||||||
|
Index map[string]interface{} `json:"index"`
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -82,3 +82,13 @@
|
||||||
# To pass custom HTTP headers please define it in a given below section
|
# To pass custom HTTP headers please define it in a given below section
|
||||||
# [outputs.elasticsearch.headers]
|
# [outputs.elasticsearch.headers]
|
||||||
# "X-Custom-Header" = "custom-value"
|
# "X-Custom-Header" = "custom-value"
|
||||||
|
|
||||||
|
## Template Index Settings
|
||||||
|
## Overrides the template settings.index section with any provided options.
|
||||||
|
## Defaults provided here in the config
|
||||||
|
# template_index_settings = {
|
||||||
|
# refresh_interval = "10s",
|
||||||
|
# mapping.total_fields.limit = 5000,
|
||||||
|
# auto_expand_replicas = "0-1",
|
||||||
|
# codec = "best_compression"
|
||||||
|
# }
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue