chore: Enable G110 rule for gosec (#13044)
Co-authored-by: Pawel Zak <Pawel Zak>
This commit is contained in:
parent
596ecc4a67
commit
ba16eeb495
|
|
@ -95,6 +95,7 @@ linters-settings:
|
||||||
- G107
|
- G107
|
||||||
- G108
|
- G108
|
||||||
- G109
|
- G109
|
||||||
|
- G110
|
||||||
- G111
|
- G111
|
||||||
- G112
|
- G112
|
||||||
- G114
|
- G114
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,12 @@ import (
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"compress/zlib"
|
"compress/zlib"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const DefaultMaxDecompressionSize = 500 * 1024 * 1024 //500MB
|
||||||
|
|
||||||
// NewStreamContentDecoder returns a reader that will decode the stream
|
// NewStreamContentDecoder returns a reader that will decode the stream
|
||||||
// according to the encoding type.
|
// according to the encoding type.
|
||||||
func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
|
func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
|
||||||
|
|
@ -92,11 +95,11 @@ func (a *AutoDecoder) SetEncoding(encoding string) {
|
||||||
a.encoding = encoding
|
a.encoding = encoding
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AutoDecoder) Decode(data []byte) ([]byte, error) {
|
func (a *AutoDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) {
|
||||||
if a.encoding == "gzip" {
|
if a.encoding == "gzip" {
|
||||||
return a.gzip.Decode(data)
|
return a.gzip.Decode(data, maxDecompressionSize)
|
||||||
}
|
}
|
||||||
return a.identity.Decode(data)
|
return a.identity.Decode(data, maxDecompressionSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAutoContentDecoder() *AutoDecoder {
|
func NewAutoContentDecoder() *AutoDecoder {
|
||||||
|
|
@ -199,7 +202,7 @@ func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
|
||||||
// ContentDecoder removes a wrapper encoding from byte buffers.
|
// ContentDecoder removes a wrapper encoding from byte buffers.
|
||||||
type ContentDecoder interface {
|
type ContentDecoder interface {
|
||||||
SetEncoding(string)
|
SetEncoding(string)
|
||||||
Decode([]byte) ([]byte, error)
|
Decode([]byte, int64) ([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GzipDecoder decompresses buffers with gzip compression.
|
// GzipDecoder decompresses buffers with gzip compression.
|
||||||
|
|
@ -217,17 +220,20 @@ func NewGzipDecoder() *GzipDecoder {
|
||||||
|
|
||||||
func (*GzipDecoder) SetEncoding(string) {}
|
func (*GzipDecoder) SetEncoding(string) {}
|
||||||
|
|
||||||
func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
|
func (d *GzipDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) {
|
||||||
err := d.reader.Reset(bytes.NewBuffer(data))
|
err := d.reader.Reset(bytes.NewBuffer(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.buf.Reset()
|
d.buf.Reset()
|
||||||
|
|
||||||
_, err = d.buf.ReadFrom(d.reader)
|
n, err := io.CopyN(d.buf, d.reader, maxDecompressionSize)
|
||||||
if err != nil && !errors.Is(err, io.EOF) {
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
} else if n == maxDecompressionSize {
|
||||||
|
return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", maxDecompressionSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = d.reader.Close()
|
err = d.reader.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -247,7 +253,7 @@ func NewZlibDecoder() *ZlibDecoder {
|
||||||
|
|
||||||
func (*ZlibDecoder) SetEncoding(string) {}
|
func (*ZlibDecoder) SetEncoding(string) {}
|
||||||
|
|
||||||
func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) {
|
func (d *ZlibDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) {
|
||||||
d.buf.Reset()
|
d.buf.Reset()
|
||||||
|
|
||||||
b := bytes.NewBuffer(data)
|
b := bytes.NewBuffer(data)
|
||||||
|
|
@ -255,10 +261,14 @@ func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
_, err = io.Copy(d.buf, r)
|
|
||||||
|
n, err := io.CopyN(d.buf, r, maxDecompressionSize)
|
||||||
if err != nil && !errors.Is(err, io.EOF) {
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
} else if n == maxDecompressionSize {
|
||||||
|
return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", maxDecompressionSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.Close()
|
err = r.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -275,6 +285,10 @@ func NewIdentityDecoder() *IdentityDecoder {
|
||||||
|
|
||||||
func (*IdentityDecoder) SetEncoding(string) {}
|
func (*IdentityDecoder) SetEncoding(string) {}
|
||||||
|
|
||||||
func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
|
func (*IdentityDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) {
|
||||||
|
size := int64(len(data))
|
||||||
|
if size > maxDecompressionSize {
|
||||||
|
return nil, fmt.Errorf("size of decoded data: %d exceeds allowed size %d", size, maxDecompressionSize)
|
||||||
|
}
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const maxDecompressionSize = 1024
|
||||||
|
|
||||||
func TestGzipEncodeDecode(t *testing.T) {
|
func TestGzipEncodeDecode(t *testing.T) {
|
||||||
enc := NewGzipEncoder()
|
enc := NewGzipEncoder()
|
||||||
dec := NewGzipDecoder()
|
dec := NewGzipDecoder()
|
||||||
|
|
@ -15,7 +17,7 @@ func TestGzipEncodeDecode(t *testing.T) {
|
||||||
payload, err := enc.Encode([]byte("howdy"))
|
payload, err := enc.Encode([]byte("howdy"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual, err := dec.Decode(payload)
|
actual, err := dec.Decode(payload, maxDecompressionSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "howdy", string(actual))
|
require.Equal(t, "howdy", string(actual))
|
||||||
|
|
@ -28,7 +30,7 @@ func TestGzipReuse(t *testing.T) {
|
||||||
payload, err := enc.Encode([]byte("howdy"))
|
payload, err := enc.Encode([]byte("howdy"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual, err := dec.Decode(payload)
|
actual, err := dec.Decode(payload, maxDecompressionSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "howdy", string(actual))
|
require.Equal(t, "howdy", string(actual))
|
||||||
|
|
@ -36,7 +38,7 @@ func TestGzipReuse(t *testing.T) {
|
||||||
payload, err = enc.Encode([]byte("doody"))
|
payload, err = enc.Encode([]byte("doody"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual, err = dec.Decode(payload)
|
actual, err = dec.Decode(payload, maxDecompressionSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "doody", string(actual))
|
require.Equal(t, "doody", string(actual))
|
||||||
|
|
@ -49,12 +51,23 @@ func TestZlibEncodeDecode(t *testing.T) {
|
||||||
payload, err := enc.Encode([]byte("howdy"))
|
payload, err := enc.Encode([]byte("howdy"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual, err := dec.Decode(payload)
|
actual, err := dec.Decode(payload, maxDecompressionSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "howdy", string(actual))
|
require.Equal(t, "howdy", string(actual))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestZlibEncodeDecodeWithTooLargeMessage(t *testing.T) {
|
||||||
|
enc := NewZlibEncoder()
|
||||||
|
dec := NewZlibDecoder()
|
||||||
|
|
||||||
|
payload, err := enc.Encode([]byte("howdy"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = dec.Decode(payload, 3)
|
||||||
|
require.ErrorContains(t, err, "size of decoded data exceeds allowed size 3")
|
||||||
|
}
|
||||||
|
|
||||||
func TestIdentityEncodeDecode(t *testing.T) {
|
func TestIdentityEncodeDecode(t *testing.T) {
|
||||||
enc := NewIdentityEncoder()
|
enc := NewIdentityEncoder()
|
||||||
dec := NewIdentityDecoder()
|
dec := NewIdentityDecoder()
|
||||||
|
|
@ -62,7 +75,7 @@ func TestIdentityEncodeDecode(t *testing.T) {
|
||||||
payload, err := enc.Encode([]byte("howdy"))
|
payload, err := enc.Encode([]byte("howdy"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual, err := dec.Decode(payload)
|
actual, err := dec.Decode(payload, maxDecompressionSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "howdy", string(actual))
|
require.Equal(t, "howdy", string(actual))
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## - Use "auto" determine the encoding using the ContentEncoding header
|
## - Use "auto" determine the encoding using the ContentEncoding header
|
||||||
# content_encoding = "identity"
|
# content_encoding = "identity"
|
||||||
|
|
||||||
|
## Maximum size of decoded message.
|
||||||
|
## Acceptable units are B, KiB, KB, MiB, MB...
|
||||||
|
## Without quotes and units, interpreted as size in bytes.
|
||||||
|
# max_decompression_size = "500MB"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
|
@ -55,8 +56,9 @@ type AMQPConsumer struct {
|
||||||
AuthMethod string
|
AuthMethod string
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
ContentEncoding string `toml:"content_encoding"`
|
ContentEncoding string `toml:"content_encoding"`
|
||||||
Log telegraf.Logger
|
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
|
||||||
|
Log telegraf.Logger
|
||||||
|
|
||||||
deliveries map[telegraf.TrackingID]amqp.Delivery
|
deliveries map[telegraf.TrackingID]amqp.Delivery
|
||||||
|
|
||||||
|
|
@ -113,6 +115,10 @@ func (a *AMQPConsumer) Init() error {
|
||||||
a.MaxUndeliveredMessages = 1000
|
a.MaxUndeliveredMessages = 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if a.MaxDecompressionSize <= 0 {
|
||||||
|
a.MaxDecompressionSize = internal.DefaultMaxDecompressionSize
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -144,11 +150,11 @@ func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
config := amqp.Config{
|
amqpConfig := amqp.Config{
|
||||||
TLSClientConfig: tlsCfg,
|
TLSClientConfig: tlsCfg,
|
||||||
SASL: auth, // if nil, it will be PLAIN
|
SASL: auth, // if nil, it will be PLAIN
|
||||||
}
|
}
|
||||||
return &config, nil
|
return &amqpConfig, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start satisfies the telegraf.ServiceInput interface
|
// Start satisfies the telegraf.ServiceInput interface
|
||||||
|
|
@ -412,7 +418,7 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
|
||||||
}
|
}
|
||||||
|
|
||||||
a.decoder.SetEncoding(d.ContentEncoding)
|
a.decoder.SetEncoding(d.ContentEncoding)
|
||||||
body, err := a.decoder.Decode(d.Body)
|
body, err := a.decoder.Decode(d.Body, int64(a.MaxDecompressionSize))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
onError()
|
onError()
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ func TestAutoEncoding(t *testing.T) {
|
||||||
a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery)
|
a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery)
|
||||||
a.parser = parser
|
a.parser = parser
|
||||||
a.decoder, err = internal.NewContentDecoder("auto")
|
a.decoder, err = internal.NewContentDecoder("auto")
|
||||||
|
a.MaxDecompressionSize = internal.DefaultMaxDecompressionSize
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,11 @@
|
||||||
## - Use "auto" determine the encoding using the ContentEncoding header
|
## - Use "auto" determine the encoding using the ContentEncoding header
|
||||||
# content_encoding = "identity"
|
# content_encoding = "identity"
|
||||||
|
|
||||||
|
## Maximum size of decoded message.
|
||||||
|
## Acceptable units are B, KiB, KB, MiB, MB...
|
||||||
|
## Without quotes and units, interpreted as size in bytes.
|
||||||
|
# max_decompression_size = "500MB"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
||||||
|
|
@ -89,6 +89,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## "identity" to apply no encoding.
|
## "identity" to apply no encoding.
|
||||||
# content_encoding = "identity"
|
# content_encoding = "identity"
|
||||||
|
|
||||||
|
## Maximum size of decoded packet.
|
||||||
|
## Acceptable units are B, KiB, KB, MiB, MB...
|
||||||
|
## Without quotes and units, interpreted as size in bytes.
|
||||||
|
# max_decompression_size = "500MB"
|
||||||
|
|
||||||
## Message splitting strategy and corresponding settings for stream sockets
|
## Message splitting strategy and corresponding settings for stream sockets
|
||||||
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
|
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
|
||||||
## listeners such as udp.
|
## listeners such as udp.
|
||||||
|
|
|
||||||
|
|
@ -14,11 +14,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type packetListener struct {
|
type packetListener struct {
|
||||||
Encoding string
|
Encoding string
|
||||||
SocketMode string
|
MaxDecompressionSize int64
|
||||||
ReadBufferSize int
|
SocketMode string
|
||||||
Parser telegraf.Parser
|
ReadBufferSize int
|
||||||
Log telegraf.Logger
|
Parser telegraf.Parser
|
||||||
|
Log telegraf.Logger
|
||||||
|
|
||||||
conn net.PacketConn
|
conn net.PacketConn
|
||||||
decoder internal.ContentDecoder
|
decoder internal.ContentDecoder
|
||||||
|
|
@ -36,7 +37,7 @@ func (l *packetListener) listen(acc telegraf.Accumulator) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
body, err := l.decoder.Decode(buf[:n])
|
body, err := l.decoder.Decode(buf[:n], l.MaxDecompressionSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
acc.AddError(fmt.Errorf("unable to decode incoming packet: %w", err))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,11 @@
|
||||||
## "identity" to apply no encoding.
|
## "identity" to apply no encoding.
|
||||||
# content_encoding = "identity"
|
# content_encoding = "identity"
|
||||||
|
|
||||||
|
## Maximum size of decoded packet.
|
||||||
|
## Acceptable units are B, KiB, KB, MiB, MB...
|
||||||
|
## Without quotes and units, interpreted as size in bytes.
|
||||||
|
# max_decompression_size = "500MB"
|
||||||
|
|
||||||
## Message splitting strategy and corresponding settings for stream sockets
|
## Message splitting strategy and corresponding settings for stream sockets
|
||||||
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
|
## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet
|
||||||
## listeners such as udp.
|
## listeners such as udp.
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
|
@ -45,6 +46,7 @@ type SocketListener struct {
|
||||||
KeepAlivePeriod *config.Duration `toml:"keep_alive_period"`
|
KeepAlivePeriod *config.Duration `toml:"keep_alive_period"`
|
||||||
SocketMode string `toml:"socket_mode"`
|
SocketMode string `toml:"socket_mode"`
|
||||||
ContentEncoding string `toml:"content_encoding"`
|
ContentEncoding string `toml:"content_encoding"`
|
||||||
|
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
|
||||||
SplittingStrategy string `toml:"splitting_strategy"`
|
SplittingStrategy string `toml:"splitting_strategy"`
|
||||||
SplittingDelimiter string `toml:"splitting_delimiter"`
|
SplittingDelimiter string `toml:"splitting_delimiter"`
|
||||||
SplittingLength int `toml:"splitting_length"`
|
SplittingLength int `toml:"splitting_length"`
|
||||||
|
|
@ -159,6 +161,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
return fmt.Errorf("parsing address failed: %w", err)
|
return fmt.Errorf("parsing address failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sl.MaxDecompressionSize <= 0 {
|
||||||
|
sl.MaxDecompressionSize = internal.DefaultMaxDecompressionSize
|
||||||
|
}
|
||||||
|
|
||||||
switch u.Scheme {
|
switch u.Scheme {
|
||||||
case "tcp", "tcp4", "tcp6":
|
case "tcp", "tcp4", "tcp6":
|
||||||
ssl := &streamListener{
|
ssl := &streamListener{
|
||||||
|
|
@ -195,8 +201,9 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
case "udp", "udp4", "udp6":
|
case "udp", "udp4", "udp6":
|
||||||
psl := &packetListener{
|
psl := &packetListener{
|
||||||
Encoding: sl.ContentEncoding,
|
Encoding: sl.ContentEncoding,
|
||||||
Parser: sl.parser,
|
MaxDecompressionSize: int64(sl.MaxDecompressionSize),
|
||||||
|
Parser: sl.parser,
|
||||||
}
|
}
|
||||||
if err := psl.setupUDP(u, ifname, int(sl.ReadBufferSize)); err != nil {
|
if err := psl.setupUDP(u, ifname, int(sl.ReadBufferSize)); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -204,8 +211,9 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
sl.listener = psl
|
sl.listener = psl
|
||||||
case "ip", "ip4", "ip6":
|
case "ip", "ip4", "ip6":
|
||||||
psl := &packetListener{
|
psl := &packetListener{
|
||||||
Encoding: sl.ContentEncoding,
|
Encoding: sl.ContentEncoding,
|
||||||
Parser: sl.parser,
|
MaxDecompressionSize: int64(sl.MaxDecompressionSize),
|
||||||
|
Parser: sl.parser,
|
||||||
}
|
}
|
||||||
if err := psl.setupIP(u); err != nil {
|
if err := psl.setupIP(u); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -213,8 +221,9 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
sl.listener = psl
|
sl.listener = psl
|
||||||
case "unixgram":
|
case "unixgram":
|
||||||
psl := &packetListener{
|
psl := &packetListener{
|
||||||
Encoding: sl.ContentEncoding,
|
Encoding: sl.ContentEncoding,
|
||||||
Parser: sl.parser,
|
MaxDecompressionSize: int64(sl.MaxDecompressionSize),
|
||||||
|
Parser: sl.parser,
|
||||||
}
|
}
|
||||||
if err := psl.setupUnixgram(u, sl.SocketMode); err != nil {
|
if err := psl.setupUnixgram(u, sl.SocketMode); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"compress/zlib"
|
"compress/zlib"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
|
@ -150,11 +151,15 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, namefieldnoprefix bool) string
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var maxDecompressionSize int64 = 500 * 1024 * 1024
|
||||||
bufW := bytes.NewBuffer(nil)
|
bufW := bytes.NewBuffer(nil)
|
||||||
_, err = io.Copy(bufW, r)
|
written, err := io.CopyN(bufW, r, maxDecompressionSize)
|
||||||
if err != nil {
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
return err
|
return err
|
||||||
|
} else if written == maxDecompressionSize {
|
||||||
|
return fmt.Errorf("size of decoded data exceeds allowed size %d", maxDecompressionSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.Close()
|
err = r.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
|
@ -72,8 +73,13 @@ func TestWrite(t *testing.T) {
|
||||||
gz, err := gzip.NewReader(r.Body)
|
gz, err := gzip.NewReader(r.Body)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = io.Copy(&body, gz)
|
var maxDecompressionSize int64 = 500 * 1024 * 1024
|
||||||
|
n, err := io.CopyN(&body, gz, maxDecompressionSize)
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NotEqualf(t, n, maxDecompressionSize, "size of decoded data exceeds allowed size %d", maxDecompressionSize)
|
||||||
|
|
||||||
var lm Metric
|
var lm Metric
|
||||||
err = json.Unmarshal(body.Bytes(), &lm)
|
err = json.Unmarshal(body.Bytes(), &lm)
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,8 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
|
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getMetric() telegraf.Metric {
|
func getMetric() telegraf.Metric {
|
||||||
|
|
@ -247,8 +248,15 @@ func TestContentType(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
gz, err := gzip.NewReader(r.Body)
|
gz, err := gzip.NewReader(r.Body)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, err = io.Copy(&body, gz)
|
|
||||||
|
var maxDecompressionSize int64 = 500 * 1024 * 1024
|
||||||
|
n, err := io.CopyN(&body, gz, maxDecompressionSize)
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NotEqualf(t, n, maxDecompressionSize, "size of decoded data exceeds allowed size %d", maxDecompressionSize)
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue