chore(inputs.kinesis_consumer): Cleanup code (#16267)

Co-authored-by: Dane Strandboge <136023093+DStrand1@users.noreply.github.com>
This commit is contained in:
Sven Rebhan 2024-12-11 21:25:03 +01:00 committed by GitHub
parent bcea9a28c0
commit 05435e47d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 273 additions and 331 deletions

View File

@ -0,0 +1,45 @@
package kinesis_consumer
import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
)
type decodingFunc func([]byte) ([]byte, error)
func processGzip(data []byte) ([]byte, error) {
zipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zipData.Close()
return io.ReadAll(zipData)
}
func processZlib(data []byte) ([]byte, error) {
zlibData, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zlibData.Close()
return io.ReadAll(zlibData)
}
func processNoOp(data []byte) ([]byte, error) {
return data, nil
}
func getDecodingFunc(encoding string) (decodingFunc, error) {
switch encoding {
case "gzip":
return processGzip, nil
case "zlib":
return processZlib, nil
case "none", "identity", "":
return processNoOp, nil
}
return nil, fmt.Errorf("unknown content encoding %q", encoding)
}

View File

@ -2,23 +2,15 @@
package kinesis_consumer
import (
"bytes"
"compress/gzip"
"compress/zlib"
"context"
_ "embed"
"errors"
"fmt"
"io"
"math/big"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/smithy-go/logging"
consumer "github.com/harlow/kinesis-consumer"
"github.com/harlow/kinesis-consumer/store/ddb"
@ -31,86 +23,85 @@ import (
//go:embed sample.conf
var sampleConfig string
var (
once sync.Once
// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
maxSeq = strToBint(strings.Repeat("9", 129))
negOne *big.Int
)
var once sync.Once
const (
defaultMaxUndeliveredMessages = 1000
)
type KinesisConsumer struct {
StreamName string `toml:"streamname"`
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
ContentEncoding string `toml:"content_encoding"`
Log telegraf.Logger `toml:"-"`
common_aws.CredentialConfig
type (
KinesisConsumer struct {
StreamName string `toml:"streamname"`
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
ContentEncoding string `toml:"content_encoding"`
cons *consumer.Consumer
parser telegraf.Parser
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem chan struct{}
Log telegraf.Logger `toml:"-"`
checkpoint consumer.Store
checkpoints map[string]checkpoint
records map[telegraf.TrackingID]string
checkpointTex sync.Mutex
recordsTex sync.Mutex
wg sync.WaitGroup
cons *consumer.Consumer
parser telegraf.Parser
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem chan struct{}
contentDecodingFunc decodingFunc
checkpoint consumer.Store
checkpoints map[string]checkpoint
records map[telegraf.TrackingID]string
checkpointTex sync.Mutex
recordsTex sync.Mutex
wg sync.WaitGroup
lastSeqNum string
}
processContentEncodingFunc processContent
type dynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
}
lastSeqNum *big.Int
common_aws.CredentialConfig
}
dynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
}
checkpoint struct {
streamName string
shardID string
}
)
type processContent func([]byte) ([]byte, error)
type checkpoint struct {
streamName string
shardID string
}
func (*KinesisConsumer) SampleConfig() string {
return sampleConfig
}
func (k *KinesisConsumer) Init() error {
return k.configureProcessContentEncodingFunc()
// Set defaults
if k.MaxUndeliveredMessages < 1 {
k.MaxUndeliveredMessages = 1000
}
if k.ShardIteratorType == "" {
k.ShardIteratorType = "TRIM_HORIZON"
}
if k.ContentEncoding == "" {
k.ContentEncoding = "identity"
}
f, err := getDecodingFunc(k.ContentEncoding)
if err != nil {
return err
}
k.contentDecodingFunc = f
return nil
}
func (k *KinesisConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
}
func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
err := k.connect(ac)
if err != nil {
return err
}
return nil
func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error {
return k.connect(acc)
}
func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error {
if k.cons == nil {
return k.connect(acc)
}
k.lastSeqNum = maxSeq
// Enforce writing of last received sequence number
k.lastSeqNum = ""
return nil
}
@ -138,7 +129,7 @@ func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber stri
return nil
}
func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
func (k *KinesisConsumer) connect(acc telegraf.Accumulator) error {
cfg, err := k.CredentialConfig.Credentials()
if err != nil {
return err
@ -180,7 +171,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
k.cons = cons
k.acc = ac.WithTracking(k.MaxUndeliveredMessages)
k.acc = acc.WithTracking(k.MaxUndeliveredMessages)
k.records = make(map[telegraf.TrackingID]string, k.MaxUndeliveredMessages)
k.checkpoints = make(map[string]checkpoint, k.MaxUndeliveredMessages)
k.sem = make(chan struct{}, k.MaxUndeliveredMessages)
@ -204,8 +195,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
case k.sem <- struct{}{}:
break
}
err := k.onMessage(k.acc, r)
if err != nil {
if err := k.onMessage(k.acc, r); err != nil {
<-k.sem
k.Log.Errorf("Scan parser error: %v", err)
}
@ -223,7 +213,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
}
func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
data, err := k.processContentEncodingFunc(r.Data)
data, err := k.contentDecodingFunc(r.Data)
if err != nil {
return err
}
@ -262,111 +252,37 @@ func (k *KinesisConsumer) onDelivery(ctx context.Context) {
delete(k.records, info.ID())
k.recordsTex.Unlock()
if info.Delivered() {
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
continue
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()
// at least once
if strToBint(sequenceNum).Cmp(k.lastSeqNum) > 0 {
continue
}
k.lastSeqNum = strToBint(sequenceNum)
if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil {
k.Log.Debugf("Setting checkpoint failed: %v", err)
}
} else {
if !info.Delivered() {
k.Log.Debug("Metric group failed to process")
continue
}
if k.lastSeqNum != "" {
continue
}
// Store the sequence number at least once per gather cycle using the checkpoint
// storage (usually DynamoDB).
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
continue
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()
k.Log.Tracef("persisting sequence number %q for stream %q and shard %q", sequenceNum)
k.lastSeqNum = sequenceNum
if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil {
k.Log.Errorf("Setting checkpoint failed: %v", err)
}
}
}
}
func processGzip(data []byte) ([]byte, error) {
zipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zipData.Close()
return io.ReadAll(zipData)
}
func processZlib(data []byte) ([]byte, error) {
zlibData, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zlibData.Close()
return io.ReadAll(zlibData)
}
func processNoOp(data []byte) ([]byte, error) {
return data, nil
}
func strToBint(s string) *big.Int {
n, ok := new(big.Int).SetString(s, 10)
if !ok {
return negOne
}
return n
}
func (k *KinesisConsumer) configureProcessContentEncodingFunc() error {
switch k.ContentEncoding {
case "gzip":
k.processContentEncodingFunc = processGzip
case "zlib":
k.processContentEncodingFunc = processZlib
case "none", "identity", "":
k.processContentEncodingFunc = processNoOp
default:
return fmt.Errorf("unknown content encoding %q", k.ContentEncoding)
}
return nil
}
type telegrafLoggerWrapper struct {
telegraf.Logger
}
func (t *telegrafLoggerWrapper) Log(args ...interface{}) {
t.Trace(args...)
}
func (t *telegrafLoggerWrapper) Logf(classification logging.Classification, format string, v ...interface{}) {
switch classification {
case logging.Debug:
format = "DEBUG " + format
case logging.Warn:
format = "WARN" + format
default:
format = "INFO " + format
}
t.Logger.Tracef(format, v...)
}
// noopStore implements the storage interface with discard
type noopStore struct{}
func (n noopStore) SetCheckpoint(_, _, _ string) error { return nil }
func (n noopStore) GetCheckpoint(_, _ string) (string, error) { return "", nil }
func init() {
negOne, _ = new(big.Int).SetString("-1", 10)
inputs.Add("kinesis_consumer", func() telegraf.Input {
return &KinesisConsumer{
ShardIteratorType: "TRIM_HORIZON",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
lastSeqNum: maxSeq,
ContentEncoding: "identity",
}
return &KinesisConsumer{}
})
}

View File

@ -14,220 +14,167 @@ import (
"github.com/influxdata/telegraf/testutil"
)
func TestKinesisConsumer_onMessage(t *testing.T) {
func TestInvalidCoding(t *testing.T) {
plugin := &KinesisConsumer{
ContentEncoding: "notsupported",
}
require.ErrorContains(t, plugin.Init(), "unknown content encoding")
}
func TestOnMessage(t *testing.T) {
// Prepare messages
zlibBytpes, err := base64.StdEncoding.DecodeString(
"eF5FjlFrgzAUhf9KuM+2aNB2zdsQ2xe3whQGW8qIeqdhaiSJK0P874u1Y4+Hc/jON0GHxoga858BgUF8fs5fzunHU5Jlj6cEPFDXHvXStGqsrsKWTapq44pW1SetxsF1a8qsRtGt0Yy" +
"FKbUcrFT9UbYWtQH2frntkm/s7RInkNU6t9JpWNE5WBAFPo3CcHeg+9D703OziUOhCg6MQ/yakrspuZsyEjdYfsm+Jg2K1jZEfZLKQWUvFglylBobZXDLwSP8//EGpD4NNj7dUJpT6" +
"hQY3W33h/AhCt84zDBf5l/MDl08",
)
require.NoError(t, err)
gzippedBytes, err := base64.StdEncoding.DecodeString(
"H4sIAAFXNGAAA0WOUWuDMBSF/0q4z7Zo0HbN2xDbF7fCFAZbyoh6p2FqJIkrQ/zvi7Vjj4dz+M43QYfGiBrznwGBQXx+zl/O6cdTkmWPpwQ8UNce9dK0aqyuwpZNqmrjilbVJ63GwXVr" +
"yqxG0a3RjIUptRysVP1Rtha1AfZ+ue2Sb+ztEieQ1Tq30mlY0TlYEAU+jcJwd6D70PvTc7OJQ6EKDoxD/JqSuym5mzISN1h+yb4mDYrWNkR9kspBZS8WCXKUGhtlcMvBI/z/8QakPg02" +
"Pt1QmlPqFBjdbfeH8CEK3zjMMF/mX0TaxZUpAQAA",
)
require.NoError(t, err)
notZippedBytes := []byte(`{
"messageType": "CONTROL_MESSAGE",
"owner": "CloudwatchLogs",
"logGroup": "",
"logStream": "",
"subscriptionFilters": [],
"logEvents": [
{
"id": "",
"timestamp": 1510254469274,
"message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"},"
},
{
"id": "",
"timestamp": 1510254469274,
"message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}"
}
]
}`)
parser := &json.Parser{
MetricName: "json_test",
Query: "logEvents",
StringFields: []string{"message"},
}
require.NoError(t, parser.Init())
type fields struct {
ContentEncoding string
parser telegraf.Parser
records map[telegraf.TrackingID]string
}
type args struct {
r *consumer.Record
}
type expected struct {
numberOfMetrics int
messageContains string
notZippedBytes := []byte(`
{
"messageType": "CONTROL_MESSAGE",
"owner": "CloudwatchLogs",
"logGroup": "",
"logStream": "",
"subscriptionFilters": [],
"logEvents": [
{
"id": "",
"timestamp": 1510254469274,
"message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"},"
},
{
"id": "",
"timestamp": 1510254469274,
"message": "{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}"
}
]
}
`)
tests := []struct {
name string
fields fields
args args
wantErr bool
expected expected
name string
encoding string
records map[telegraf.TrackingID]string
args *consumer.Record
expectedNumber int
expectedContent string
}{
{
name: "test no compression",
fields: fields{
ContentEncoding: "none",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
name: "test no compression",
encoding: "none",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 2,
},
expectedNumber: 2,
expectedContent: "bob",
},
{
name: "test no compression via empty string for ContentEncoding",
fields: fields{
ContentEncoding: "",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
name: "test no compression via empty string for ContentEncoding",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 2,
},
expectedNumber: 2,
expectedContent: "bob",
},
{
name: "test no compression via identity ContentEncoding",
fields: fields{
ContentEncoding: "identity",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
name: "test no compression via identity ContentEncoding",
encoding: "identity",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 2,
},
expectedNumber: 2,
expectedContent: "bob",
},
{
name: "test no compression via no ContentEncoding",
fields: fields{
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
name: "test no compression via no ContentEncoding",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 2,
},
expectedNumber: 2,
expectedContent: "bob",
},
{
name: "test gzip compression",
fields: fields{
ContentEncoding: "gzip",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{
Record: types.Record{
Data: gzippedBytes,
SequenceNumber: aws.String("anything"),
},
name: "test gzip compression",
encoding: "gzip",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: gzippedBytes,
SequenceNumber: aws.String("anything"),
},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 1,
},
expectedNumber: 1,
expectedContent: "bob",
},
{
name: "test zlib compression",
fields: fields{
ContentEncoding: "zlib",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{
Record: types.Record{
Data: zlibBytpes,
SequenceNumber: aws.String("anything"),
},
name: "test zlib compression",
encoding: "zlib",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: zlibBytpes,
SequenceNumber: aws.String("anything"),
},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 1,
},
expectedNumber: 1,
expectedContent: "bob",
},
}
k := &KinesisConsumer{
ContentEncoding: "notsupported",
}
err = k.Init()
require.Error(t, err)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
k := &KinesisConsumer{
ContentEncoding: tt.fields.ContentEncoding,
parser: tt.fields.parser,
records: tt.fields.records,
// Prepare JSON parser
parser := &json.Parser{
MetricName: "json_test",
Query: "logEvents",
StringFields: []string{"message"},
}
err := k.Init()
require.NoError(t, err)
require.NoError(t, parser.Init())
acc := testutil.Accumulator{}
if err := k.onMessage(acc.WithTracking(tt.expected.numberOfMetrics), tt.args.r); (err != nil) != tt.wantErr {
t.Errorf("onMessage() error = %v, wantErr %v", err, tt.wantErr)
// Setup plugin
plugin := &KinesisConsumer{
ContentEncoding: tt.encoding,
parser: parser,
records: tt.records,
}
require.NoError(t, plugin.Init())
require.Len(t, acc.Metrics, tt.expected.numberOfMetrics)
var acc testutil.Accumulator
require.NoError(t, plugin.onMessage(acc.WithTracking(tt.expectedNumber), tt.args))
for _, metric := range acc.Metrics {
if logEventMessage, ok := metric.Fields["message"]; ok {
require.Contains(t, logEventMessage.(string), tt.expected.messageContains)
} else {
t.Errorf("Expect logEvents to be present")
}
actual := acc.GetTelegrafMetrics()
require.Len(t, actual, tt.expectedNumber)
for _, metric := range actual {
raw, found := metric.GetField("message")
require.True(t, found, "no message present")
message, ok := raw.(string)
require.Truef(t, ok, "message not a string but %T", raw)
require.Contains(t, message, tt.expectedContent)
}
})
}

View File

@ -0,0 +1,27 @@
package kinesis_consumer
import (
"github.com/aws/smithy-go/logging"
"github.com/influxdata/telegraf"
)
type telegrafLoggerWrapper struct {
telegraf.Logger
}
func (t *telegrafLoggerWrapper) Log(args ...interface{}) {
t.Trace(args...)
}
func (t *telegrafLoggerWrapper) Logf(classification logging.Classification, format string, v ...interface{}) {
switch classification {
case logging.Debug:
format = "DEBUG " + format
case logging.Warn:
format = "WARN" + format
default:
format = "INFO " + format
}
t.Logger.Tracef(format, v...)
}

View File

@ -0,0 +1,7 @@
package kinesis_consumer
// noopStore implements the storage interface with discard
type noopStore struct{}
func (noopStore) SetCheckpoint(_, _, _ string) error { return nil }
func (noopStore) GetCheckpoint(_, _ string) (string, error) { return "", nil }