Add content_encoding option to kinesis_consumer input (#8891)
This commit is contained in:
parent
d5f79093f4
commit
30830c2ec2
|
|
@ -54,6 +54,15 @@ and creates metrics using one of the supported [input data formats][].
|
|||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
|
||||
##
|
||||
## The content encoding of the data from kinesis
|
||||
## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
|
||||
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
|
||||
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
|
||||
## is done automatically by the golang sdk, as data is read from kinesis)
|
||||
##
|
||||
# content_encoding = "identity"
|
||||
|
||||
## Optional
|
||||
## Configuration for a dynamodb checkpoint
|
||||
[inputs.kinesis_consumer.checkpoint_dynamodb]
|
||||
|
|
|
|||
|
|
@ -1,8 +1,12 @@
|
|||
package kinesis_consumer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"compress/zlib"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"strings"
|
||||
"sync"
|
||||
|
|
@ -38,6 +42,7 @@ type (
|
|||
ShardIteratorType string `toml:"shard_iterator_type"`
|
||||
DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"`
|
||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||
ContentEncoding string `toml:"content_encoding"`
|
||||
|
||||
Log telegraf.Logger
|
||||
|
||||
|
|
@ -55,6 +60,8 @@ type (
|
|||
recordsTex sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
|
||||
processContentEncodingFunc processContent
|
||||
|
||||
lastSeqNum *big.Int
|
||||
}
|
||||
|
||||
|
|
@ -68,6 +75,8 @@ const (
|
|||
defaultMaxUndeliveredMessages = 1000
|
||||
)
|
||||
|
||||
type processContent func([]byte) ([]byte, error)
|
||||
|
||||
// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
|
||||
var maxSeq = strToBint(strings.Repeat("9", 129))
|
||||
|
||||
|
|
@ -118,6 +127,15 @@ var sampleConfig = `
|
|||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "influx"
|
||||
|
||||
##
|
||||
## The content encoding of the data from kinesis
|
||||
## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
|
||||
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
|
||||
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
|
||||
## is done automatically by the golang sdk, as data is read from kinesis)
|
||||
##
|
||||
# content_encoding = "identity"
|
||||
|
||||
## Optional
|
||||
## Configuration for a dynamodb checkpoint
|
||||
[inputs.kinesis_consumer.checkpoint_dynamodb]
|
||||
|
|
@ -239,7 +257,11 @@ func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
|
|||
}
|
||||
|
||||
func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
|
||||
metrics, err := k.parser.Parse(r.Data)
|
||||
data, err := k.processContentEncodingFunc(r.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metrics, err := k.parser.Parse(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -334,6 +356,46 @@ func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func processGzip(data []byte) ([]byte, error) {
|
||||
zipData, err := gzip.NewReader(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer zipData.Close()
|
||||
return ioutil.ReadAll(zipData)
|
||||
}
|
||||
|
||||
func processZlib(data []byte) ([]byte, error) {
|
||||
zlibData, err := zlib.NewReader(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer zlibData.Close()
|
||||
return ioutil.ReadAll(zlibData)
|
||||
}
|
||||
|
||||
func processNoOp(data []byte) ([]byte, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (k *KinesisConsumer) configureProcessContentEncodingFunc() error {
|
||||
switch k.ContentEncoding {
|
||||
case "gzip":
|
||||
k.processContentEncodingFunc = processGzip
|
||||
case "zlib":
|
||||
k.processContentEncodingFunc = processZlib
|
||||
case "none", "identity", "":
|
||||
k.processContentEncodingFunc = processNoOp
|
||||
default:
|
||||
return fmt.Errorf("unknown content encoding %q", k.ContentEncoding)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KinesisConsumer) Init() error {
|
||||
return k.configureProcessContentEncodingFunc()
|
||||
}
|
||||
|
||||
type noopCheckpoint struct{}
|
||||
|
||||
func (n noopCheckpoint) Set(string, string, string) error { return nil }
|
||||
|
|
@ -347,6 +409,7 @@ func init() {
|
|||
ShardIteratorType: "TRIM_HORIZON",
|
||||
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
|
||||
lastSeqNum: maxSeq,
|
||||
ContentEncoding: "identity",
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,177 @@
|
|||
package kinesis_consumer
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
consumer "github.com/harlow/kinesis-consumer"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestKinesisConsumer_onMessage(t *testing.T) {
|
||||
zlibBytpes, _ := base64.StdEncoding.DecodeString("eF5FjlFrgzAUhf9KuM+2aNB2zdsQ2xe3whQGW8qIeqdhaiSJK0P874u1Y4+Hc/jON0GHxoga858BgUF8fs5fzunHU5Jlj6cEPFDXHvXStGqsrsKWTapq44pW1SetxsF1a8qsRtGt0YyFKbUcrFT9UbYWtQH2frntkm/s7RInkNU6t9JpWNE5WBAFPo3CcHeg+9D703OziUOhCg6MQ/yakrspuZsyEjdYfsm+Jg2K1jZEfZLKQWUvFglylBobZXDLwSP8//EGpD4NNj7dUJpT6hQY3W33h/AhCt84zDBf5l/MDl08")
|
||||
gzippedBytes, _ := base64.StdEncoding.DecodeString("H4sIAAFXNGAAA0WOUWuDMBSF/0q4z7Zo0HbN2xDbF7fCFAZbyoh6p2FqJIkrQ/zvi7Vjj4dz+M43QYfGiBrznwGBQXx+zl/O6cdTkmWPpwQ8UNce9dK0aqyuwpZNqmrjilbVJ63GwXVryqxG0a3RjIUptRysVP1Rtha1AfZ+ue2Sb+ztEieQ1Tq30mlY0TlYEAU+jcJwd6D70PvTc7OJQ6EKDoxD/JqSuym5mzISN1h+yb4mDYrWNkR9kspBZS8WCXKUGhtlcMvBI/z/8QakPg02Pt1QmlPqFBjdbfeH8CEK3zjMMF/mX0TaxZUpAQAA")
|
||||
notZippedBytes := []byte(`{"messageType":"CONTROL_MESSAGE","owner":"CloudwatchLogs","logGroup":"","logStream":"",
|
||||
"subscriptionFilters":[],"logEvents":[
|
||||
{"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"},"},
|
||||
{"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}"}
|
||||
]}`)
|
||||
parser, _ := json.New(&json.Config{
|
||||
MetricName: "json_test",
|
||||
Query: "logEvents",
|
||||
StringFields: []string{"message"},
|
||||
})
|
||||
|
||||
type fields struct {
|
||||
ContentEncoding string
|
||||
parser parsers.Parser
|
||||
records map[telegraf.TrackingID]string
|
||||
}
|
||||
type args struct {
|
||||
r *consumer.Record
|
||||
}
|
||||
type expected struct {
|
||||
numberOfMetrics int
|
||||
messageContains string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
expected expected
|
||||
}{
|
||||
{
|
||||
name: "test no compression",
|
||||
fields: fields{
|
||||
ContentEncoding: "none",
|
||||
parser: parser,
|
||||
records: make(map[telegraf.TrackingID]string),
|
||||
},
|
||||
args: args{
|
||||
r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")},
|
||||
},
|
||||
wantErr: false,
|
||||
expected: expected{
|
||||
messageContains: "bob",
|
||||
numberOfMetrics: 2,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test no compression via empty string for ContentEncoding",
|
||||
fields: fields{
|
||||
ContentEncoding: "",
|
||||
parser: parser,
|
||||
records: make(map[telegraf.TrackingID]string),
|
||||
},
|
||||
args: args{
|
||||
r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")},
|
||||
},
|
||||
wantErr: false,
|
||||
expected: expected{
|
||||
messageContains: "bob",
|
||||
numberOfMetrics: 2,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test no compression via identity ContentEncoding",
|
||||
fields: fields{
|
||||
ContentEncoding: "identity",
|
||||
parser: parser,
|
||||
records: make(map[telegraf.TrackingID]string),
|
||||
},
|
||||
args: args{
|
||||
r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")},
|
||||
},
|
||||
wantErr: false,
|
||||
expected: expected{
|
||||
messageContains: "bob",
|
||||
numberOfMetrics: 2,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test no compression via no ContentEncoding",
|
||||
fields: fields{
|
||||
parser: parser,
|
||||
records: make(map[telegraf.TrackingID]string),
|
||||
},
|
||||
args: args{
|
||||
r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")},
|
||||
},
|
||||
wantErr: false,
|
||||
expected: expected{
|
||||
messageContains: "bob",
|
||||
numberOfMetrics: 2,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test gzip compression",
|
||||
fields: fields{
|
||||
ContentEncoding: "gzip",
|
||||
parser: parser,
|
||||
records: make(map[telegraf.TrackingID]string),
|
||||
},
|
||||
args: args{
|
||||
r: &consumer.Record{Data: gzippedBytes, SequenceNumber: aws.String("anything")},
|
||||
},
|
||||
wantErr: false,
|
||||
expected: expected{
|
||||
messageContains: "bob",
|
||||
numberOfMetrics: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test zlib compression",
|
||||
fields: fields{
|
||||
ContentEncoding: "zlib",
|
||||
parser: parser,
|
||||
records: make(map[telegraf.TrackingID]string),
|
||||
},
|
||||
args: args{
|
||||
r: &consumer.Record{Data: zlibBytpes, SequenceNumber: aws.String("anything")},
|
||||
},
|
||||
wantErr: false,
|
||||
expected: expected{
|
||||
messageContains: "bob",
|
||||
numberOfMetrics: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
k := &KinesisConsumer{
|
||||
ContentEncoding: "notsupported",
|
||||
}
|
||||
err := k.Init()
|
||||
assert.NotNil(t, err)
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
k := &KinesisConsumer{
|
||||
ContentEncoding: tt.fields.ContentEncoding,
|
||||
parser: tt.fields.parser,
|
||||
records: tt.fields.records,
|
||||
}
|
||||
err := k.Init()
|
||||
assert.Nil(t, err)
|
||||
|
||||
acc := testutil.Accumulator{}
|
||||
if err := k.onMessage(acc.WithTracking(tt.expected.numberOfMetrics), tt.args.r); (err != nil) != tt.wantErr {
|
||||
t.Errorf("onMessage() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
|
||||
assert.Equal(t, tt.expected.numberOfMetrics, len(acc.Metrics))
|
||||
|
||||
for _, metric := range acc.Metrics {
|
||||
if logEventMessage, ok := metric.Fields["message"]; ok {
|
||||
assert.Contains(t, logEventMessage.(string), tt.expected.messageContains)
|
||||
} else {
|
||||
t.Errorf("Expect logEvents to be present")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue