feat(outputs.file): Add compression (#13245)
This commit is contained in:
parent
c34a1f70d9
commit
7aa3d79631
|
|
@ -41,4 +41,17 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## more about them here:
|
## more about them here:
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
data_format = "influx"
|
data_format = "influx"
|
||||||
|
|
||||||
|
## Compress output data with the specifed algorithm.
|
||||||
|
## If empty, compression will be disabled and files will be plain text.
|
||||||
|
## Supported algorithms are "zstd", "gzip" and "zlib".
|
||||||
|
# compression_algorithm = ""
|
||||||
|
|
||||||
|
## Compression level for the algorithm above.
|
||||||
|
## Please note that different algorithms support different levels:
|
||||||
|
## zstd -- supports levels 1, 3, 7 and 11.
|
||||||
|
## gzip -- supports levels 0, 1 and 9.
|
||||||
|
## zlib -- supports levels 0, 1, and 9.
|
||||||
|
## By default the default compression level for each algorithm is used.
|
||||||
|
# compression_level = -1
|
||||||
```
|
```
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/internal/rotate"
|
"github.com/influxdata/telegraf/internal/rotate"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
|
|
@ -19,13 +20,16 @@ import (
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
type File struct {
|
type File struct {
|
||||||
Files []string `toml:"files"`
|
Files []string `toml:"files"`
|
||||||
RotationInterval config.Duration `toml:"rotation_interval"`
|
RotationInterval config.Duration `toml:"rotation_interval"`
|
||||||
RotationMaxSize config.Size `toml:"rotation_max_size"`
|
RotationMaxSize config.Size `toml:"rotation_max_size"`
|
||||||
RotationMaxArchives int `toml:"rotation_max_archives"`
|
RotationMaxArchives int `toml:"rotation_max_archives"`
|
||||||
UseBatchFormat bool `toml:"use_batch_format"`
|
UseBatchFormat bool `toml:"use_batch_format"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
CompressionAlgorithm string `toml:"compression_algorithm"`
|
||||||
|
CompressionLevel int `toml:"compression_level"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
encoder internal.ContentEncoder
|
||||||
writer io.Writer
|
writer io.Writer
|
||||||
closers []io.Closer
|
closers []io.Closer
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
|
|
@ -39,13 +43,28 @@ func (f *File) SetSerializer(serializer serializers.Serializer) {
|
||||||
f.serializer = serializer
|
f.serializer = serializer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *File) Connect() error {
|
func (f *File) Init() error {
|
||||||
writers := []io.Writer{}
|
var err error
|
||||||
|
|
||||||
if len(f.Files) == 0 {
|
if len(f.Files) == 0 {
|
||||||
f.Files = []string{"stdout"}
|
f.Files = []string{"stdout"}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var options []internal.EncodingOption
|
||||||
|
if f.CompressionAlgorithm == "" {
|
||||||
|
f.CompressionAlgorithm = "identity"
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.CompressionLevel >= 0 {
|
||||||
|
options = append(options, internal.WithCompressionLevel(f.CompressionLevel))
|
||||||
|
}
|
||||||
|
f.encoder, err = internal.NewContentEncoder(f.CompressionAlgorithm, options...)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *File) Connect() error {
|
||||||
|
var writers []io.Writer
|
||||||
|
|
||||||
for _, file := range f.Files {
|
for _, file := range f.Files {
|
||||||
if file == "stdout" {
|
if file == "stdout" {
|
||||||
writers = append(writers, os.Stdout)
|
writers = append(writers, os.Stdout)
|
||||||
|
|
@ -84,6 +103,11 @@ func (f *File) Write(metrics []telegraf.Metric) error {
|
||||||
f.Log.Errorf("Could not serialize metric: %v", err)
|
f.Log.Errorf("Could not serialize metric: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
octets, err = f.encoder.Encode(octets)
|
||||||
|
if err != nil {
|
||||||
|
f.Log.Errorf("Could not compress metrics: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
_, err = f.writer.Write(octets)
|
_, err = f.writer.Write(octets)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.Log.Errorf("Error writing to file: %v", err)
|
f.Log.Errorf("Error writing to file: %v", err)
|
||||||
|
|
@ -95,6 +119,11 @@ func (f *File) Write(metrics []telegraf.Metric) error {
|
||||||
f.Log.Debugf("Could not serialize metric: %v", err)
|
f.Log.Debugf("Could not serialize metric: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b, err = f.encoder.Encode(b)
|
||||||
|
if err != nil {
|
||||||
|
f.Log.Errorf("Could not compress metrics: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
_, err = f.writer.Write(b)
|
_, err = f.writer.Write(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr = fmt.Errorf("failed to write message: %w", err)
|
writeErr = fmt.Errorf("failed to write message: %w", err)
|
||||||
|
|
@ -107,6 +136,8 @@ func (f *File) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("file", func() telegraf.Output {
|
outputs.Add("file", func() telegraf.Output {
|
||||||
return &File{}
|
return &File{
|
||||||
|
CompressionLevel: -1,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ const (
|
||||||
expNewFile = "test1,tag1=value1 value=1 1257894000000000000\n"
|
expNewFile = "test1,tag1=value1 value=1 1257894000000000000\n"
|
||||||
expExistFile = "cpu,cpu=cpu0 value=100 1455312810012459582\n" +
|
expExistFile = "cpu,cpu=cpu0 value=100 1455312810012459582\n" +
|
||||||
"test1,tag1=value1 value=1 1257894000000000000\n"
|
"test1,tag1=value1 value=1 1257894000000000000\n"
|
||||||
|
maxDecompressionSize = 1024 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFileExistingFile(t *testing.T) {
|
func TestFileExistingFile(t *testing.T) {
|
||||||
|
|
@ -26,14 +27,15 @@ func TestFileExistingFile(t *testing.T) {
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{fh.Name()},
|
Files: []string{fh.Name()},
|
||||||
serializer: s,
|
serializer: s,
|
||||||
|
CompressionLevel: -1,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
require.NoError(t, f.Init())
|
||||||
require.NoError(t, err)
|
require.NoError(t, f.Connect())
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err := f.Write(testutil.MockMetrics())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
validateFile(t, fh.Name(), expExistFile)
|
validateFile(t, fh.Name(), expExistFile)
|
||||||
|
|
@ -48,11 +50,14 @@ func TestFileNewFile(t *testing.T) {
|
||||||
|
|
||||||
fh := tmpFile(t)
|
fh := tmpFile(t)
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{fh},
|
Files: []string{fh},
|
||||||
serializer: s,
|
serializer: s,
|
||||||
|
CompressionLevel: -1,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = f.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
|
|
@ -73,11 +78,14 @@ func TestFileExistingFiles(t *testing.T) {
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{fh1.Name(), fh2.Name(), fh3.Name()},
|
Files: []string{fh1.Name(), fh2.Name(), fh3.Name()},
|
||||||
serializer: s,
|
serializer: s,
|
||||||
|
CompressionLevel: -1,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = f.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
|
|
@ -91,6 +99,84 @@ func TestFileExistingFiles(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewGzipCompressedFiles(t *testing.T) {
|
||||||
|
s := &influx.Serializer{}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
|
fh1 := tmpFile(t)
|
||||||
|
fh2 := tmpFile(t)
|
||||||
|
fh3 := tmpFile(t)
|
||||||
|
f := File{
|
||||||
|
Files: []string{fh1, fh2, fh3},
|
||||||
|
serializer: s,
|
||||||
|
CompressionAlgorithm: "gzip",
|
||||||
|
CompressionLevel: -1,
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, f.Init())
|
||||||
|
require.NoError(t, f.Connect())
|
||||||
|
|
||||||
|
require.NoError(t, f.Write(testutil.MockMetrics()))
|
||||||
|
|
||||||
|
validateGzipCompressedFile(t, fh1, expNewFile)
|
||||||
|
validateGzipCompressedFile(t, fh2, expNewFile)
|
||||||
|
validateGzipCompressedFile(t, fh3, expNewFile)
|
||||||
|
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewZlibCompressedFiles(t *testing.T) {
|
||||||
|
s := &influx.Serializer{}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
|
fh1 := tmpFile(t)
|
||||||
|
fh2 := tmpFile(t)
|
||||||
|
fh3 := tmpFile(t)
|
||||||
|
f := File{
|
||||||
|
Files: []string{fh1, fh2, fh3},
|
||||||
|
serializer: s,
|
||||||
|
CompressionAlgorithm: "zlib",
|
||||||
|
CompressionLevel: -1,
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, f.Init())
|
||||||
|
require.NoError(t, f.Connect())
|
||||||
|
|
||||||
|
require.NoError(t, f.Write(testutil.MockMetrics()))
|
||||||
|
|
||||||
|
validateZlibCompressedFile(t, fh1, expNewFile)
|
||||||
|
validateZlibCompressedFile(t, fh2, expNewFile)
|
||||||
|
validateZlibCompressedFile(t, fh3, expNewFile)
|
||||||
|
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewZstdCompressedFiles(t *testing.T) {
|
||||||
|
s := &influx.Serializer{}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
|
fh1 := tmpFile(t)
|
||||||
|
fh2 := tmpFile(t)
|
||||||
|
fh3 := tmpFile(t)
|
||||||
|
f := File{
|
||||||
|
Files: []string{fh1, fh2, fh3},
|
||||||
|
serializer: s,
|
||||||
|
CompressionAlgorithm: "zstd",
|
||||||
|
CompressionLevel: -1,
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, f.Init())
|
||||||
|
require.NoError(t, f.Connect())
|
||||||
|
|
||||||
|
require.NoError(t, f.Write(testutil.MockMetrics()))
|
||||||
|
|
||||||
|
validateZstdCompressedFile(t, fh1, expNewFile)
|
||||||
|
validateZstdCompressedFile(t, fh2, expNewFile)
|
||||||
|
validateZstdCompressedFile(t, fh3, expNewFile)
|
||||||
|
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
}
|
||||||
|
|
||||||
func TestFileNewFiles(t *testing.T) {
|
func TestFileNewFiles(t *testing.T) {
|
||||||
s := &influx.Serializer{}
|
s := &influx.Serializer{}
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
|
|
@ -99,11 +185,14 @@ func TestFileNewFiles(t *testing.T) {
|
||||||
fh2 := tmpFile(t)
|
fh2 := tmpFile(t)
|
||||||
fh3 := tmpFile(t)
|
fh3 := tmpFile(t)
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{fh1, fh2, fh3},
|
Files: []string{fh1, fh2, fh3},
|
||||||
serializer: s,
|
serializer: s,
|
||||||
|
CompressionLevel: -1,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = f.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
|
|
@ -125,11 +214,14 @@ func TestFileBoth(t *testing.T) {
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{fh1.Name(), fh2},
|
Files: []string{fh1.Name(), fh2},
|
||||||
serializer: s,
|
serializer: s,
|
||||||
|
CompressionLevel: -1,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = f.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
|
|
@ -152,11 +244,14 @@ func TestFileStdout(t *testing.T) {
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{"stdout"},
|
Files: []string{"stdout"},
|
||||||
serializer: s,
|
serializer: s,
|
||||||
|
CompressionLevel: -1,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = f.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
|
|
@ -208,3 +303,33 @@ func validateFile(t *testing.T, fileName, expS string) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expS, string(buf))
|
require.Equal(t, expS, string(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateZstdCompressedFile(t *testing.T, fileName, expS string) {
|
||||||
|
decoder, err := internal.NewContentDecoder("zstd", internal.WithMaxDecompressionSize(maxDecompressionSize))
|
||||||
|
require.NoError(t, err)
|
||||||
|
buf, err := os.ReadFile(fileName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
buf, err = decoder.Decode(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expS, string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateGzipCompressedFile(t *testing.T, fileName, expS string) {
|
||||||
|
buf, err := os.ReadFile(fileName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rfr, err := internal.NewContentDecoder("gzip", internal.WithMaxDecompressionSize(maxDecompressionSize))
|
||||||
|
require.NoError(t, err)
|
||||||
|
buf, err = rfr.Decode(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expS, string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateZlibCompressedFile(t *testing.T, fileName, expS string) {
|
||||||
|
buf, err := os.ReadFile(fileName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
rfr, err := internal.NewContentDecoder("zlib", internal.WithMaxDecompressionSize(maxDecompressionSize))
|
||||||
|
require.NoError(t, err)
|
||||||
|
buf, err = rfr.Decode(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expS, string(buf))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,3 +25,16 @@
|
||||||
## more about them here:
|
## more about them here:
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
data_format = "influx"
|
data_format = "influx"
|
||||||
|
|
||||||
|
## Compress output data with the specifed algorithm.
|
||||||
|
## If empty, compression will be disabled and files will be plain text.
|
||||||
|
## Supported algorithms are "zstd", "gzip" and "zlib".
|
||||||
|
# compression_algorithm = ""
|
||||||
|
|
||||||
|
## Compression level for the algorithm above.
|
||||||
|
## Please note that different algorithms support different levels:
|
||||||
|
## zstd -- supports levels 1, 3, 7 and 11.
|
||||||
|
## gzip -- supports levels 0, 1 and 9.
|
||||||
|
## zlib -- supports levels 0, 1, and 9.
|
||||||
|
## By default the default compression level for each algorithm is used.
|
||||||
|
# compression_level = -1
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue