From 05435e47d325a8fd4bd2d49655650323fb060973 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 11 Dec 2024 21:25:03 +0100 Subject: [PATCH] chore(inputs.kinesis_consumer): Cleanup code (#16267) Co-authored-by: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> --- plugins/inputs/kinesis_consumer/encoding.go | 45 +++ .../kinesis_consumer/kinesis_consumer.go | 248 ++++++---------- .../kinesis_consumer/kinesis_consumer_test.go | 277 +++++++----------- plugins/inputs/kinesis_consumer/logging.go | 27 ++ plugins/inputs/kinesis_consumer/noop_store.go | 7 + 5 files changed, 273 insertions(+), 331 deletions(-) create mode 100644 plugins/inputs/kinesis_consumer/encoding.go create mode 100644 plugins/inputs/kinesis_consumer/logging.go create mode 100644 plugins/inputs/kinesis_consumer/noop_store.go diff --git a/plugins/inputs/kinesis_consumer/encoding.go b/plugins/inputs/kinesis_consumer/encoding.go new file mode 100644 index 000000000..d2bad6fd8 --- /dev/null +++ b/plugins/inputs/kinesis_consumer/encoding.go @@ -0,0 +1,45 @@ +package kinesis_consumer + +import ( + "bytes" + "compress/gzip" + "compress/zlib" + "fmt" + "io" +) + +type decodingFunc func([]byte) ([]byte, error) + +func processGzip(data []byte) ([]byte, error) { + zipData, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer zipData.Close() + return io.ReadAll(zipData) +} + +func processZlib(data []byte) ([]byte, error) { + zlibData, err := zlib.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer zlibData.Close() + return io.ReadAll(zlibData) +} + +func processNoOp(data []byte) ([]byte, error) { + return data, nil +} + +func getDecodingFunc(encoding string) (decodingFunc, error) { + switch encoding { + case "gzip": + return processGzip, nil + case "zlib": + return processZlib, nil + case "none", "identity", "": + return processNoOp, nil + } + return nil, fmt.Errorf("unknown content encoding %q", encoding) +} diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer.go b/plugins/inputs/kinesis_consumer/kinesis_consumer.go index 819a36b0a..4c65aadef 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer.go @@ -2,23 +2,15 @@ package kinesis_consumer import ( - "bytes" - "compress/gzip" - "compress/zlib" "context" _ "embed" "errors" - "fmt" - "io" - "math/big" - "strings" "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/kinesis" - "github.com/aws/smithy-go/logging" consumer "github.com/harlow/kinesis-consumer" "github.com/harlow/kinesis-consumer/store/ddb" @@ -31,86 +23,85 @@ import ( //go:embed sample.conf var sampleConfig string -var ( - once sync.Once - // this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html - maxSeq = strToBint(strings.Repeat("9", 129)) - negOne *big.Int -) +var once sync.Once -const ( - defaultMaxUndeliveredMessages = 1000 -) +type KinesisConsumer struct { + StreamName string `toml:"streamname"` + ShardIteratorType string `toml:"shard_iterator_type"` + DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + ContentEncoding string `toml:"content_encoding"` + Log telegraf.Logger `toml:"-"` + common_aws.CredentialConfig -type ( - KinesisConsumer struct { - StreamName string `toml:"streamname"` - ShardIteratorType string `toml:"shard_iterator_type"` - DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"` - MaxUndeliveredMessages int `toml:"max_undelivered_messages"` - ContentEncoding string `toml:"content_encoding"` + cons *consumer.Consumer + parser telegraf.Parser + cancel context.CancelFunc + acc telegraf.TrackingAccumulator + sem chan struct{} - Log telegraf.Logger `toml:"-"` + checkpoint consumer.Store + checkpoints map[string]checkpoint + records map[telegraf.TrackingID]string + checkpointTex sync.Mutex + recordsTex sync.Mutex + wg sync.WaitGroup - cons *consumer.Consumer - parser telegraf.Parser - cancel context.CancelFunc - acc telegraf.TrackingAccumulator - sem chan struct{} + contentDecodingFunc decodingFunc - checkpoint consumer.Store - checkpoints map[string]checkpoint - records map[telegraf.TrackingID]string - checkpointTex sync.Mutex - recordsTex sync.Mutex - wg sync.WaitGroup + lastSeqNum string +} - processContentEncodingFunc processContent +type dynamoDB struct { + AppName string `toml:"app_name"` + TableName string `toml:"table_name"` +} - lastSeqNum *big.Int - - common_aws.CredentialConfig - } - - dynamoDB struct { - AppName string `toml:"app_name"` - TableName string `toml:"table_name"` - } - - checkpoint struct { - streamName string - shardID string - } -) - -type processContent func([]byte) ([]byte, error) +type checkpoint struct { + streamName string + shardID string +} func (*KinesisConsumer) SampleConfig() string { return sampleConfig } func (k *KinesisConsumer) Init() error { - return k.configureProcessContentEncodingFunc() + // Set defaults + if k.MaxUndeliveredMessages < 1 { + k.MaxUndeliveredMessages = 1000 + } + + if k.ShardIteratorType == "" { + k.ShardIteratorType = "TRIM_HORIZON" + } + if k.ContentEncoding == "" { + k.ContentEncoding = "identity" + } + + f, err := getDecodingFunc(k.ContentEncoding) + if err != nil { + return err + } + k.contentDecodingFunc = f + + return nil } func (k *KinesisConsumer) SetParser(parser telegraf.Parser) { k.parser = parser } -func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error { - err := k.connect(ac) - if err != nil { - return err - } - - return nil +func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error { + return k.connect(acc) } func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error { if k.cons == nil { return k.connect(acc) } - k.lastSeqNum = maxSeq + // Enforce writing of last received sequence number + k.lastSeqNum = "" return nil } @@ -138,7 +129,7 @@ func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber stri return nil } -func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error { +func (k *KinesisConsumer) connect(acc telegraf.Accumulator) error { cfg, err := k.CredentialConfig.Credentials() if err != nil { return err @@ -180,7 +171,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error { k.cons = cons - k.acc = ac.WithTracking(k.MaxUndeliveredMessages) + k.acc = acc.WithTracking(k.MaxUndeliveredMessages) k.records = make(map[telegraf.TrackingID]string, k.MaxUndeliveredMessages) k.checkpoints = make(map[string]checkpoint, k.MaxUndeliveredMessages) k.sem = make(chan struct{}, k.MaxUndeliveredMessages) @@ -204,8 +195,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error { case k.sem <- struct{}{}: break } - err := k.onMessage(k.acc, r) - if err != nil { + if err := k.onMessage(k.acc, r); err != nil { <-k.sem k.Log.Errorf("Scan parser error: %v", err) } @@ -223,7 +213,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error { } func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error { - data, err := k.processContentEncodingFunc(r.Data) + data, err := k.contentDecodingFunc(r.Data) if err != nil { return err } @@ -262,111 +252,37 @@ func (k *KinesisConsumer) onDelivery(ctx context.Context) { delete(k.records, info.ID()) k.recordsTex.Unlock() - if info.Delivered() { - k.checkpointTex.Lock() - chk, ok := k.checkpoints[sequenceNum] - if !ok { - k.checkpointTex.Unlock() - continue - } - delete(k.checkpoints, sequenceNum) - k.checkpointTex.Unlock() - - // at least once - if strToBint(sequenceNum).Cmp(k.lastSeqNum) > 0 { - continue - } - - k.lastSeqNum = strToBint(sequenceNum) - if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil { - k.Log.Debugf("Setting checkpoint failed: %v", err) - } - } else { + if !info.Delivered() { k.Log.Debug("Metric group failed to process") + continue + } + + if k.lastSeqNum != "" { + continue + } + + // Store the sequence number at least once per gather cycle using the checkpoint + // storage (usually DynamoDB). + k.checkpointTex.Lock() + chk, ok := k.checkpoints[sequenceNum] + if !ok { + k.checkpointTex.Unlock() + continue + } + delete(k.checkpoints, sequenceNum) + k.checkpointTex.Unlock() + + k.Log.Tracef("persisting sequence number %q for stream %q and shard %q", sequenceNum) + k.lastSeqNum = sequenceNum + if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil { + k.Log.Errorf("Setting checkpoint failed: %v", err) } } } } -func processGzip(data []byte) ([]byte, error) { - zipData, err := gzip.NewReader(bytes.NewReader(data)) - if err != nil { - return nil, err - } - defer zipData.Close() - return io.ReadAll(zipData) -} - -func processZlib(data []byte) ([]byte, error) { - zlibData, err := zlib.NewReader(bytes.NewReader(data)) - if err != nil { - return nil, err - } - defer zlibData.Close() - return io.ReadAll(zlibData) -} - -func processNoOp(data []byte) ([]byte, error) { - return data, nil -} - -func strToBint(s string) *big.Int { - n, ok := new(big.Int).SetString(s, 10) - if !ok { - return negOne - } - return n -} - -func (k *KinesisConsumer) configureProcessContentEncodingFunc() error { - switch k.ContentEncoding { - case "gzip": - k.processContentEncodingFunc = processGzip - case "zlib": - k.processContentEncodingFunc = processZlib - case "none", "identity", "": - k.processContentEncodingFunc = processNoOp - default: - return fmt.Errorf("unknown content encoding %q", k.ContentEncoding) - } - return nil -} - -type telegrafLoggerWrapper struct { - telegraf.Logger -} - -func (t *telegrafLoggerWrapper) Log(args ...interface{}) { - t.Trace(args...) -} - -func (t *telegrafLoggerWrapper) Logf(classification logging.Classification, format string, v ...interface{}) { - switch classification { - case logging.Debug: - format = "DEBUG " + format - case logging.Warn: - format = "WARN" + format - default: - format = "INFO " + format - } - t.Logger.Tracef(format, v...) -} - -// noopStore implements the storage interface with discard -type noopStore struct{} - -func (n noopStore) SetCheckpoint(_, _, _ string) error { return nil } -func (n noopStore) GetCheckpoint(_, _ string) (string, error) { return "", nil } - func init() { - negOne, _ = new(big.Int).SetString("-1", 10) - inputs.Add("kinesis_consumer", func() telegraf.Input { - return &KinesisConsumer{ - ShardIteratorType: "TRIM_HORIZON", - MaxUndeliveredMessages: defaultMaxUndeliveredMessages, - lastSeqNum: maxSeq, - ContentEncoding: "identity", - } + return &KinesisConsumer{} }) } diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go index e09e0df37..b48372571 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go @@ -14,220 +14,167 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func TestKinesisConsumer_onMessage(t *testing.T) { +func TestInvalidCoding(t *testing.T) { + plugin := &KinesisConsumer{ + ContentEncoding: "notsupported", + } + require.ErrorContains(t, plugin.Init(), "unknown content encoding") +} + +func TestOnMessage(t *testing.T) { + // Prepare messages zlibBytpes, err := base64.StdEncoding.DecodeString( "eF5FjlFrgzAUhf9KuM+2aNB2zdsQ2xe3whQGW8qIeqdhaiSJK0P874u1Y4+Hc/jON0GHxoga858BgUF8fs5fzunHU5Jlj6cEPFDXHvXStGqsrsKWTapq44pW1SetxsF1a8qsRtGt0Yy" + "FKbUcrFT9UbYWtQH2frntkm/s7RInkNU6t9JpWNE5WBAFPo3CcHeg+9D703OziUOhCg6MQ/yakrspuZsyEjdYfsm+Jg2K1jZEfZLKQWUvFglylBobZXDLwSP8//EGpD4NNj7dUJpT6" + "hQY3W33h/AhCt84zDBf5l/MDl08", ) require.NoError(t, err) + gzippedBytes, err := base64.StdEncoding.DecodeString( "H4sIAAFXNGAAA0WOUWuDMBSF/0q4z7Zo0HbN2xDbF7fCFAZbyoh6p2FqJIkrQ/zvi7Vjj4dz+M43QYfGiBrznwGBQXx+zl/O6cdTkmWPpwQ8UNce9dK0aqyuwpZNqmrjilbVJ63GwXVr" + "yqxG0a3RjIUptRysVP1Rtha1AfZ+ue2Sb+ztEieQ1Tq30mlY0TlYEAU+jcJwd6D70PvTc7OJQ6EKDoxD/JqSuym5mzISN1h+yb4mDYrWNkR9kspBZS8WCXKUGhtlcMvBI/z/8QakPg02" + "Pt1QmlPqFBjdbfeH8CEK3zjMMF/mX0TaxZUpAQAA", ) require.NoError(t, err) - notZippedBytes := []byte(`{ - "messageType": "CONTROL_MESSAGE", - "owner": "CloudwatchLogs", - "logGroup": "", - "logStream": "", - "subscriptionFilters": [], - "logEvents": [ - { - "id": "", - "timestamp": 1510254469274, - "message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}," - }, - { - "id": "", - "timestamp": 1510254469274, - "message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}" - } - ] -}`) - parser := &json.Parser{ - MetricName: "json_test", - Query: "logEvents", - StringFields: []string{"message"}, - } - require.NoError(t, parser.Init()) - type fields struct { - ContentEncoding string - parser telegraf.Parser - records map[telegraf.TrackingID]string - } - type args struct { - r *consumer.Record - } - type expected struct { - numberOfMetrics int - messageContains string + notZippedBytes := []byte(` + { + "messageType": "CONTROL_MESSAGE", + "owner": "CloudwatchLogs", + "logGroup": "", + "logStream": "", + "subscriptionFilters": [], + "logEvents": [ + { + "id": "", + "timestamp": 1510254469274, + "message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}," + }, + { + "id": "", + "timestamp": 1510254469274, + "message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}" + } + ] } + `) + tests := []struct { - name string - fields fields - args args - wantErr bool - expected expected + name string + encoding string + records map[telegraf.TrackingID]string + args *consumer.Record + expectedNumber int + expectedContent string }{ { - name: "test no compression", - fields: fields{ - ContentEncoding: "none", - parser: parser, - records: make(map[telegraf.TrackingID]string), - }, - args: args{ - r: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + name: "test no compression", + encoding: "none", + records: make(map[telegraf.TrackingID]string), + args: &consumer.Record{ + Record: types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, }, - wantErr: false, - expected: expected{ - messageContains: "bob", - numberOfMetrics: 2, - }, + expectedNumber: 2, + expectedContent: "bob", }, { - name: "test no compression via empty string for ContentEncoding", - fields: fields{ - ContentEncoding: "", - parser: parser, - records: make(map[telegraf.TrackingID]string), - }, - args: args{ - r: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + name: "test no compression via empty string for ContentEncoding", + records: make(map[telegraf.TrackingID]string), + args: &consumer.Record{ + Record: types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, }, - wantErr: false, - expected: expected{ - messageContains: "bob", - numberOfMetrics: 2, - }, + expectedNumber: 2, + expectedContent: "bob", }, { - name: "test no compression via identity ContentEncoding", - fields: fields{ - ContentEncoding: "identity", - parser: parser, - records: make(map[telegraf.TrackingID]string), - }, - args: args{ - r: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + name: "test no compression via identity ContentEncoding", + encoding: "identity", + records: make(map[telegraf.TrackingID]string), + args: &consumer.Record{ + Record: types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, }, - wantErr: false, - expected: expected{ - messageContains: "bob", - numberOfMetrics: 2, - }, + expectedNumber: 2, + expectedContent: "bob", }, { - name: "test no compression via no ContentEncoding", - fields: fields{ - parser: parser, - records: make(map[telegraf.TrackingID]string), - }, - args: args{ - r: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + name: "test no compression via no ContentEncoding", + records: make(map[telegraf.TrackingID]string), + args: &consumer.Record{ + Record: types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, }, - wantErr: false, - expected: expected{ - messageContains: "bob", - numberOfMetrics: 2, - }, + expectedNumber: 2, + expectedContent: "bob", }, { - name: "test gzip compression", - fields: fields{ - ContentEncoding: "gzip", - parser: parser, - records: make(map[telegraf.TrackingID]string), - }, - args: args{ - r: &consumer.Record{ - Record: types.Record{ - Data: gzippedBytes, - SequenceNumber: aws.String("anything"), - }, + name: "test gzip compression", + encoding: "gzip", + records: make(map[telegraf.TrackingID]string), + args: &consumer.Record{ + Record: types.Record{ + Data: gzippedBytes, + SequenceNumber: aws.String("anything"), }, }, - wantErr: false, - expected: expected{ - messageContains: "bob", - numberOfMetrics: 1, - }, + expectedNumber: 1, + expectedContent: "bob", }, { - name: "test zlib compression", - fields: fields{ - ContentEncoding: "zlib", - parser: parser, - records: make(map[telegraf.TrackingID]string), - }, - args: args{ - r: &consumer.Record{ - Record: types.Record{ - Data: zlibBytpes, - SequenceNumber: aws.String("anything"), - }, + name: "test zlib compression", + encoding: "zlib", + records: make(map[telegraf.TrackingID]string), + args: &consumer.Record{ + Record: types.Record{ + Data: zlibBytpes, + SequenceNumber: aws.String("anything"), }, }, - wantErr: false, - expected: expected{ - messageContains: "bob", - numberOfMetrics: 1, - }, + expectedNumber: 1, + expectedContent: "bob", }, } - k := &KinesisConsumer{ - ContentEncoding: "notsupported", - } - err = k.Init() - require.Error(t, err) - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - k := &KinesisConsumer{ - ContentEncoding: tt.fields.ContentEncoding, - parser: tt.fields.parser, - records: tt.fields.records, + // Prepare JSON parser + parser := &json.Parser{ + MetricName: "json_test", + Query: "logEvents", + StringFields: []string{"message"}, } - err := k.Init() - require.NoError(t, err) + require.NoError(t, parser.Init()) - acc := testutil.Accumulator{} - if err := k.onMessage(acc.WithTracking(tt.expected.numberOfMetrics), tt.args.r); (err != nil) != tt.wantErr { - t.Errorf("onMessage() error = %v, wantErr %v", err, tt.wantErr) + // Setup plugin + plugin := &KinesisConsumer{ + ContentEncoding: tt.encoding, + parser: parser, + records: tt.records, } + require.NoError(t, plugin.Init()) - require.Len(t, acc.Metrics, tt.expected.numberOfMetrics) + var acc testutil.Accumulator + require.NoError(t, plugin.onMessage(acc.WithTracking(tt.expectedNumber), tt.args)) - for _, metric := range acc.Metrics { - if logEventMessage, ok := metric.Fields["message"]; ok { - require.Contains(t, logEventMessage.(string), tt.expected.messageContains) - } else { - t.Errorf("Expect logEvents to be present") - } + actual := acc.GetTelegrafMetrics() + require.Len(t, actual, tt.expectedNumber) + + for _, metric := range actual { + raw, found := metric.GetField("message") + require.True(t, found, "no message present") + message, ok := raw.(string) + require.Truef(t, ok, "message not a string but %T", raw) + require.Contains(t, message, tt.expectedContent) } }) } diff --git a/plugins/inputs/kinesis_consumer/logging.go b/plugins/inputs/kinesis_consumer/logging.go new file mode 100644 index 000000000..82e945865 --- /dev/null +++ b/plugins/inputs/kinesis_consumer/logging.go @@ -0,0 +1,27 @@ +package kinesis_consumer + +import ( + "github.com/aws/smithy-go/logging" + + "github.com/influxdata/telegraf" +) + +type telegrafLoggerWrapper struct { + telegraf.Logger +} + +func (t *telegrafLoggerWrapper) Log(args ...interface{}) { + t.Trace(args...) +} + +func (t *telegrafLoggerWrapper) Logf(classification logging.Classification, format string, v ...interface{}) { + switch classification { + case logging.Debug: + format = "DEBUG " + format + case logging.Warn: + format = "WARN" + format + default: + format = "INFO " + format + } + t.Logger.Tracef(format, v...) +} diff --git a/plugins/inputs/kinesis_consumer/noop_store.go b/plugins/inputs/kinesis_consumer/noop_store.go new file mode 100644 index 000000000..f400fdc71 --- /dev/null +++ b/plugins/inputs/kinesis_consumer/noop_store.go @@ -0,0 +1,7 @@ +package kinesis_consumer + +// noopStore implements the storage interface with discard +type noopStore struct{} + +func (noopStore) SetCheckpoint(_, _, _ string) error { return nil } +func (noopStore) GetCheckpoint(_, _ string) (string, error) { return "", nil }