Add ability to enable gzip compression in elasticsearch output (#8913)
This commit is contained in:
parent
3e9f191142
commit
b56ffdc498
|
|
@ -30,6 +30,7 @@ type Elasticsearch struct {
|
||||||
EnableSniffer bool
|
EnableSniffer bool
|
||||||
Timeout config.Duration
|
Timeout config.Duration
|
||||||
HealthCheckInterval config.Duration
|
HealthCheckInterval config.Duration
|
||||||
|
EnableGzip bool
|
||||||
ManageTemplate bool
|
ManageTemplate bool
|
||||||
TemplateName string
|
TemplateName string
|
||||||
OverwriteTemplate bool
|
OverwriteTemplate bool
|
||||||
|
|
@ -50,6 +51,8 @@ var sampleConfig = `
|
||||||
## Set to true to ask Elasticsearch a list of all cluster nodes,
|
## Set to true to ask Elasticsearch a list of all cluster nodes,
|
||||||
## thus it is not necessary to list all nodes in the urls config option.
|
## thus it is not necessary to list all nodes in the urls config option.
|
||||||
enable_sniffer = false
|
enable_sniffer = false
|
||||||
|
## Set to true to enable gzip compression
|
||||||
|
enable_gzip = false
|
||||||
## Set the interval to check if the Elasticsearch nodes are available
|
## Set the interval to check if the Elasticsearch nodes are available
|
||||||
## Setting to "0s" will disable the health check (not recommended in production)
|
## Setting to "0s" will disable the health check (not recommended in production)
|
||||||
health_check_interval = "10s"
|
health_check_interval = "10s"
|
||||||
|
|
@ -197,6 +200,7 @@ func (a *Elasticsearch) Connect() error {
|
||||||
elastic.SetSniff(a.EnableSniffer),
|
elastic.SetSniff(a.EnableSniffer),
|
||||||
elastic.SetURL(a.URLs...),
|
elastic.SetURL(a.URLs...),
|
||||||
elastic.SetHealthcheckInterval(time.Duration(a.HealthCheckInterval)),
|
elastic.SetHealthcheckInterval(time.Duration(a.HealthCheckInterval)),
|
||||||
|
elastic.SetGzip(a.EnableGzip),
|
||||||
)
|
)
|
||||||
|
|
||||||
if a.Username != "" && a.Password != "" {
|
if a.Username != "" && a.Password != "" {
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,8 @@ package elasticsearch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -22,6 +24,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
|
||||||
URLs: urls,
|
URLs: urls,
|
||||||
IndexName: "test-%Y.%m.%d",
|
IndexName: "test-%Y.%m.%d",
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
EnableGzip: true,
|
||||||
ManageTemplate: true,
|
ManageTemplate: true,
|
||||||
TemplateName: "telegraf",
|
TemplateName: "telegraf",
|
||||||
OverwriteTemplate: false,
|
OverwriteTemplate: false,
|
||||||
|
|
@ -50,6 +53,7 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
|
||||||
URLs: urls,
|
URLs: urls,
|
||||||
IndexName: "test-%Y.%m.%d",
|
IndexName: "test-%Y.%m.%d",
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
EnableGzip: true,
|
||||||
ManageTemplate: true,
|
ManageTemplate: true,
|
||||||
TemplateName: "",
|
TemplateName: "",
|
||||||
OverwriteTemplate: true,
|
OverwriteTemplate: true,
|
||||||
|
|
@ -70,6 +74,7 @@ func TestTemplateManagementIntegration(t *testing.T) {
|
||||||
URLs: urls,
|
URLs: urls,
|
||||||
IndexName: "test-%Y.%m.%d",
|
IndexName: "test-%Y.%m.%d",
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
EnableGzip: true,
|
||||||
ManageTemplate: true,
|
ManageTemplate: true,
|
||||||
TemplateName: "telegraf",
|
TemplateName: "telegraf",
|
||||||
OverwriteTemplate: true,
|
OverwriteTemplate: true,
|
||||||
|
|
@ -96,6 +101,7 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) {
|
||||||
URLs: urls,
|
URLs: urls,
|
||||||
IndexName: "{{host}}-%Y.%m.%d",
|
IndexName: "{{host}}-%Y.%m.%d",
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
EnableGzip: true,
|
||||||
ManageTemplate: true,
|
ManageTemplate: true,
|
||||||
TemplateName: "telegraf",
|
TemplateName: "telegraf",
|
||||||
OverwriteTemplate: true,
|
OverwriteTemplate: true,
|
||||||
|
|
@ -254,3 +260,70 @@ func TestGetIndexName(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/_bulk":
|
||||||
|
require.Equal(t, "gzip", r.Header.Get("Content-Encoding"))
|
||||||
|
require.Equal(t, "gzip", r.Header.Get("Accept-Encoding"))
|
||||||
|
_, err := w.Write([]byte("{}"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
_, err := w.Write([]byte(`{"version": {"number": "7.8"}}`))
|
||||||
|
require.NoError(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
urls := []string{"http://" + ts.Listener.Addr().String()}
|
||||||
|
|
||||||
|
e := &Elasticsearch{
|
||||||
|
URLs: urls,
|
||||||
|
IndexName: "{{host}}-%Y.%m.%d",
|
||||||
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
EnableGzip: true,
|
||||||
|
ManageTemplate: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := e.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = e.Write(testutil.MockMetrics())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRequestHeaderWhenGzipIsDisabled(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/_bulk":
|
||||||
|
require.NotEqual(t, "gzip", r.Header.Get("Content-Encoding"))
|
||||||
|
_, err := w.Write([]byte("{}"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
_, err := w.Write([]byte(`{"version": {"number": "7.8"}}`))
|
||||||
|
require.NoError(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
urls := []string{"http://" + ts.Listener.Addr().String()}
|
||||||
|
|
||||||
|
e := &Elasticsearch{
|
||||||
|
URLs: urls,
|
||||||
|
IndexName: "{{host}}-%Y.%m.%d",
|
||||||
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
EnableGzip: false,
|
||||||
|
ManageTemplate: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := e.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = e.Write(testutil.MockMetrics())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue