From f636016cb63d31ee89710e9fd81b70100ea6aa3f Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 5 Mar 2025 16:24:24 +0100 Subject: [PATCH] chore(inputs.kinesis_consumer): Replace consumer library by own implementation (#16332) --- docs/LICENSE_OF_DEPENDENCIES.md | 4 - go.mod | 4 - go.sum | 76 ---- plugins/inputs/kinesis_consumer/README.md | 56 +-- plugins/inputs/kinesis_consumer/consumer.go | 355 ++++++++++++++++++ .../kinesis_consumer/kinesis_consumer.go | 300 +++++++-------- .../kinesis_consumer/kinesis_consumer_test.go | 69 ++-- plugins/inputs/kinesis_consumer/noop_store.go | 7 - plugins/inputs/kinesis_consumer/sample.conf | 46 ++- plugins/inputs/kinesis_consumer/store.go | 179 +++++++++ 10 files changed, 762 insertions(+), 334 deletions(-) create mode 100644 plugins/inputs/kinesis_consumer/consumer.go delete mode 100644 plugins/inputs/kinesis_consumer/noop_store.go create mode 100644 plugins/inputs/kinesis_consumer/store.go diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 83878a82e..27f2729bd 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -69,7 +69,6 @@ following works: - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/aws/protocol/eventstream/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/config [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/config/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/credentials [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/credentials/LICENSE.txt) -- github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/dynamodb/attributevalue/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/feature/ec2/imds [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/ec2/imds/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/feature/s3/manager [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/s3/manager/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/internal/configsources [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/internal/configsources/LICENSE.txt) @@ -79,7 +78,6 @@ following works: - github.com/aws/aws-sdk-go-v2/service/cloudwatch [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/cloudwatch/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/cloudwatchlogs/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/service/dynamodb [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/dynamodb/LICENSE.txt) -- github.com/aws/aws-sdk-go-v2/service/dynamodbstreams [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/dynamodbstreams/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/service/ec2 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/ec2/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/accept-encoding/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/service/internal/checksum [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/checksum/LICENSE.txt) @@ -93,7 +91,6 @@ following works: - github.com/aws/aws-sdk-go-v2/service/sts [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/sts/LICENSE.txt) - github.com/aws/aws-sdk-go-v2/service/timestreamwrite [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/timestreamwrite/LICENSE.txt) - github.com/aws/smithy-go [Apache License 2.0](https://github.com/aws/smithy-go/blob/main/LICENSE) -- github.com/awslabs/kinesis-aggregation/go [Apache License 2.0](https://github.com/awslabs/kinesis-aggregation/blob/master/LICENSE.txt) - github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE) - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE) @@ -203,7 +200,6 @@ following works: - github.com/gsterjov/go-libsecret [MIT License](https://github.com/gsterjov/go-libsecret/blob/master/LICENSE) - github.com/gwos/tcg/sdk [MIT License](https://github.com/gwos/tcg/blob/master/LICENSE) - github.com/hailocab/go-hostpool [MIT License](https://github.com/hailocab/go-hostpool/blob/master/LICENSE) -- github.com/harlow/kinesis-consumer [MIT License](https://github.com/harlow/kinesis-consumer/blob/master/LICENSE) - github.com/hashicorp/consul/api [Mozilla Public License 2.0](https://github.com/hashicorp/consul/blob/main/api/LICENSE) - github.com/hashicorp/errwrap [Mozilla Public License 2.0](https://github.com/hashicorp/errwrap/blob/master/LICENSE) - github.com/hashicorp/go-cleanhttp [Mozilla Public License 2.0](https://github.com/hashicorp/go-cleanhttp/blob/master/LICENSE) diff --git a/go.mod b/go.mod index a199fad4b..e1ca2bcb4 100644 --- a/go.mod +++ b/go.mod @@ -111,7 +111,6 @@ require ( github.com/gosnmp/gosnmp v1.38.0 github.com/grid-x/modbus v0.0.0-20240503115206-582f2ab60a18 github.com/gwos/tcg/sdk v0.0.0-20240830123415-f8a34bba6358 - github.com/harlow/kinesis-consumer v0.3.6-0.20240916192723-43900507c911 github.com/hashicorp/consul/api v1.29.2 github.com/hashicorp/go-uuid v1.0.3 github.com/hashicorp/golang-lru/v2 v2.0.7 @@ -286,13 +285,11 @@ require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/awnumar/memcall v0.3.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect - github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.7 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.33 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.33 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect - github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.1 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13 // indirect @@ -301,7 +298,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect - github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bitly/go-hostpool v0.1.0 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect diff --git a/go.sum b/go.sum index efbc24f82..457f0bd5d 100644 --- a/go.sum +++ b/go.sum @@ -733,7 +733,6 @@ github.com/ClickHouse/ch-go v0.64.1 h1:FWpP+QU4KchgzpEekuv8YoI/fUc4H2r6Bwc5Wwrzv github.com/ClickHouse/ch-go v0.64.1/go.mod h1:RBUynvczWwVzhS6Up9lPKlH1mrk4UAmle6uzCiW4Pkc= github.com/ClickHouse/clickhouse-go/v2 v2.30.3 h1:m0VZqUNCJ7lOmZfmOE3HZUMixZHftKmZLqcrz2+UVHk= github.com/ClickHouse/clickhouse-go/v2 v2.30.3/go.mod h1:V1aZaG0ctMbd8KVi+D4loXi97duWYtHiQHMCgipKJcI= -github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= @@ -820,8 +819,6 @@ github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrI github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs= github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI= github.com/aliyun/alibaba-cloud-sdk-go v1.62.721 h1:OwLOwY8UfcuwE2eoKA2CxNewpUQv8Qnmpf7UcYNihvk= @@ -856,10 +853,6 @@ github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2 github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= -github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY= -github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= -github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= -github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys= github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc h1:LoL75er+LKDHDUfU5tRvFwxH0LjPpZN8OoG8Ll+liGU= github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc/go.mod h1:w648aMHEgFYS6xb0KVMMtZ2uMeemhiKCuD2vj6gY52A= github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 h1:Bmjk+DjIi3tTAU0wxGaFbfjGUqlxxSXARq9A96Kgoos= @@ -878,43 +871,30 @@ github.com/awnumar/memcall v0.3.0 h1:8b/3Sptrtgejj2kLgL6M5F2r4OzTf19CTllO+gIXUg8 github.com/awnumar/memcall v0.3.0/go.mod h1:8xOx1YbfyuCg3Fy6TO8DK0kZUua3V42/goA5Ru47E8w= github.com/awnumar/memguard v0.22.5 h1:PH7sbUVERS5DdXh3+mLo8FDcl1eIeVjJVYMnyuYpvuI= github.com/awnumar/memguard v0.22.5/go.mod h1:+APmZGThMBWjnMlKiSM1X7MVpbIVewen2MTkqWkA/zE= -github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= github.com/aws/aws-sdk-go v1.44.263/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/aws/aws-sdk-go-v2 v1.8.1/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0= -github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= -github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2 v1.36.2 h1:Ub6I4lq/71+tPb/atswvToaLGVMxKZvjYDVOWEExOcU= github.com/aws/aws-sdk-go-v2 v1.36.2/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= -github.com/aws/aws-sdk-go-v2/config v1.6.1/go.mod h1:t/y3UPu0XEDy0cEw6mvygaBQaPzWiYAxfP2SzgtvclA= github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4= github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9Q5CsPShmyo= github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko= -github.com/aws/aws-sdk-go-v2/credentials v1.3.3/go.mod h1:oVieKMT3m9BSfqhOfuQ+E0j/yN84ZAJ7Qv8Sfume/ak= github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o= github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw= github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w= -github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0/go.mod h1:UVFtSYSWCHj2+brBLDHUdlJXmz8LxUpZhA+Ewypc+xQ= -github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.7 h1:FZB15YK2h/l2wO9YXvXr7/mZ5uOJIsLNZIePlHarAwg= -github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.7/go.mod h1:xTMr0gSUW6H6nJJVV257wWlk9257DwZ7EFhPFn3itgo= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1/go.mod h1:+GTydg3uHmVlQdkRoetz6VHKbOMEYof70m19IpMLifc= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 h1:zeN9UtUlA6FTx0vFSayxSX32HDw73Yb6Hh2izDSFxXY= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10/go.mod h1:3HKuexPDcwLWPaqpW2UR/9n8N/u/3CKcGAzSs8p8u8g= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.0.4/go.mod h1:W5gGbtNXFpF9/ssYZTaItzG/B+j0bjTnwStiCP2AtWU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.33 h1:knLyPMw3r3JsU8MFHWctE4/e2qWbPaxDYLlohPvnY8c= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.33/go.mod h1:EBp2HQ3f+XCB+5J+IoEbGhoV7CpJbnrsd4asNXmTL0A= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.33 h1:K0+Ne08zqti8J9jwENxZ5NoUyBnaFDTu3apwQJWrwwA= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.33/go.mod h1:K97stwwzaWzmqxO8yLGHhClbVW1tC6VT1pDLk1pGrq4= -github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1/go.mod h1:Pv3WenDjI0v2Jl7UaMFIIbPOBbhn33RmmAmGgkXDoqY= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= @@ -924,55 +904,39 @@ github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.14 h1:RdaxtOI+W9CqnFDLXkoF github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.14/go.mod h1:fwajvO52Dn+DVxtXQJeGLfnNq+Qm+Pul56XtOKCyN00= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.45.3 h1:va7zt8/kkg5zR0TX2r7wCXssdZ4+blRxbsA6IS9XXYI= github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.45.3/go.mod h1:CijDCaRp5sH8QM0LqImyzy5roG8cOtgp2Abj0V/4luk= -github.com/aws/aws-sdk-go-v2/service/dynamodb v1.5.0/go.mod h1:XY5YhCS9SLul3JSQ08XG/nfxXxrkh6RR21XPq/J//NY= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.40.0 h1:OoQO3OUzwhNGNyTLsNe0Scre8QxHtZZn/7yY96K/PNI= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.40.0/go.mod h1:FcMiR2AALpkrpik6JzbYu+iEfktzrs3XOq5Shk9nvik= -github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.4.0/go.mod h1:bYsEP8w5YnbYyrx/Zi5hy4hTwRRQISSJS3RWrsGRijg= -github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.1 h1:kZR1TZ0VYcRK2LFiFt61EReplssCq9SZO4gVSYV1Aww= -github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.1/go.mod h1:ifHRXsCyLVIdvDaAScQnM7jtsXtoBZFmyZiLMex8FTA= github.com/aws/aws-sdk-go-v2/service/ec2 v1.203.1 h1:ZgY9zeVAe+54Qa7o1GXKRNTez79lffCeJSSinhl+qec= github.com/aws/aws-sdk-go-v2/service/ec2 v1.203.1/go.mod h1:0naMk66LtdeTmE+1CWQTKwtzOQ2t8mavOhMhR0Pv1m0= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.3.0/go.mod h1:v8ygadNyATSm6elwJ/4gzJwcFhri9RqS8skgHKiwXPU= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 h1:YPYe6ZmvUfDDDELqEKtAd6bo8zxhkm+XEFEzQisqUIE= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17/go.mod h1:oBtcnYua/CgzCWYN7NZ5j7PotFDaFSUjCYVTtfyn7vw= -github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.1.0/go.mod h1:enkU5tq2HoXY+ZMiQprgF3Q83T3PbO77E83yXXzRZWE= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13 h1:eWoHfLIzYeUtJEuoUmD5PwTE+fLaIPN9NZ7UXd9CW0s= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13/go.mod h1:x5t8Ve0J7JK9VHKSPSRAdBrWAgr/5hH3UeCFMLoyUGQ= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3/go.mod h1:7gcsONBmFoCcKrAqrm95trrMd2+C/ReYKP7Vfu8yHHA= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.14 h1:2scbY6//jy/s8+5vGrk7l1+UtHl0h9A4MjOO2k/TM2E= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.14/go.mod h1:bRpZPHZpSe5YRHmPfK3h1M7UBFCn2szHzyx0rw04zro= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 h1:246A4lSTXWJw/rmlQI+TT2OcqeDMKBdyjEQrafMaQdA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15/go.mod h1:haVfg3761/WF7YPuJOER2MP0k4UAXyHaLclKXB6usDg= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.6 h1:yN7WEx9ksiP5+9zdKtoQYrUT51HvYw+EA1TXsElvMyk= github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.6/go.mod h1:j8MNat6qtGw5OoEACRbWtT8r5my4nRWfM/6Uk+NsuC4= github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 h1:hT8ZAZRIfqBqHbzKTII+CIiY8G2oC9OpLedkZ51DWl8= github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3/go.mod h1:Lcxzg5rojyVPU/0eFwLtcyTaek/6Mtic5B1gJo7e/zE= -github.com/aws/aws-sdk-go-v2/service/sso v1.3.3/go.mod h1:Jgw5O+SK7MZ2Yi9Yvzb4PggAPYaFSliiQuWR0hNjexk= github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI= github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 h1:rLnYAfXQ3YAccocshIH5mzNNwZBkBo+bP6EhIxak6Hw= github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 h1:JnhTZR3PiYDNKlXy50/pNeix9aGMo6lLpXwJ1mw8MD4= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY= -github.com/aws/aws-sdk-go-v2/service/sts v1.6.2/go.mod h1:RBhoMJB8yFToaCnbe0jNq5Dcdy0jp6LhHqg55rjClkM= github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8= github.com/aws/aws-sdk-go-v2/service/sts v1.33.12 h1:fqg6c1KVrc3SYWma/egWue5rKI4G2+M4wMQN2JosNAA= github.com/aws/aws-sdk-go-v2/service/sts v1.33.12/go.mod h1:7Yn+p66q/jt38qMoVfNvjbm3D89mGBnkwDcijgtih8w= github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.27.4 h1:glNNLfVzW88jz83oPZ4gXndJL7VDDANHowCoJU673OU= github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.27.4/go.mod h1:VUHrcV1XoUd6ZWzIMal9CeAA2EiKkAhmImuRGhNbaxg= -github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4= -github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -1268,7 +1232,6 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -1288,7 +1251,6 @@ github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOr github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= -github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY= github.com/go-resty/resty/v2 v2.13.1 h1:x+LHXBI2nMB1vqndymf26quycC4aggYJ7DECYbiz03g= github.com/go-resty/resty/v2 v2.13.1/go.mod h1:GznXlLxkq6Nh4sU59rPmUw3VtgpO3aS96ORAI6Q7d+0= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -1387,7 +1349,6 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= @@ -1449,7 +1410,6 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -1459,7 +1419,6 @@ github.com/google/protobuf v3.11.4+incompatible/go.mod h1:lUQ9D1ePzbH2PrIS7ob/bj github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -1530,8 +1489,6 @@ github.com/gwos/tcg/sdk v0.0.0-20240830123415-f8a34bba6358 h1:QmKzhYk6KMjUutu9Sy github.com/gwos/tcg/sdk v0.0.0-20240830123415-f8a34bba6358/go.mod h1:h40FJV0HuULqXSSKf7kfCbOxEcQAD74a5e2LC2+rYiQ= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= -github.com/harlow/kinesis-consumer v0.3.6-0.20240916192723-43900507c911 h1:eLNkr0OcBl7pzM6DCLSgVp3VQyS5ZrLnanXPqH5EmE0= -github.com/harlow/kinesis-consumer v0.3.6-0.20240916192723-43900507c911/go.mod h1:jTE9kH7IVx841D0GgxjykKieSP1yDSckuEg5ceSCjEU= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/api v1.29.2 h1:aYyRn8EdE2mSfG14S1+L9Qkjtz8RzmaWh6AcNGRNwPw= github.com/hashicorp/consul/api v1.29.2/go.mod h1:0YObcaLNDSbtlgzIRtmRXI1ZkeuK0trCBxwZQ4MYnIk= @@ -1722,7 +1679,6 @@ github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/rasw github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= -github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= @@ -1808,7 +1764,6 @@ github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b/go.mod h1: github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linkedin/goavro/v2 v2.13.0 h1:L8eI8GcuciwUkt41Ej62joSZS4kKaYIUdze+6for9NU= @@ -1837,7 +1792,6 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= -github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -1891,7 +1845,6 @@ github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ= github.com/mdlayher/vsock v1.2.1 h1:pC1mTJTvjo1r9n9fbm7S1j04rCgCzhCOS5DY0zqHlnQ= github.com/mdlayher/vsock v1.2.1/go.mod h1:NRfCibel++DgeMD8z/hP+PPTjlNJsdPOmxcnENvE+SE= -github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/mholt/archiver/v3 v3.5.0/go.mod h1:qqTTPUK/HZPFgFQ/TJ3BzvTpF/dPtFVJXdQbCmeMxwc= github.com/microsoft/ApplicationInsights-Go v0.4.4 h1:G4+H9WNs6ygSCe6sUyxRc2U81TI5Es90b2t/MwX5KqY= github.com/microsoft/ApplicationInsights-Go v0.4.4/go.mod h1:fKRUseBqkw6bDiXTs3ESTiU/4YTIHsQS4W3fP2ieF4U= @@ -2012,12 +1965,6 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108 github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= -github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU= -github.com/onsi/ginkgo/v2 v2.1.6/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk= -github.com/onsi/ginkgo/v2 v2.3.0/go.mod h1:Eew0uilEqZmIEZr8JrvYlvOM7Rr6xzTmMV8AyFNU9d0= -github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo= -github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw= github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= @@ -2025,13 +1972,6 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= -github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= -github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= -github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= -github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ1tuM= -github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= -github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM= github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.101.0 h1:TCQYvGS2MKTotOTQDnHUSd4ljEzXRzHXopdv71giKWU= @@ -2198,7 +2138,6 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG github.com/robinson/gos7 v0.0.0-20240315073918-1f14519e4846 h1:CnAbtX0j07ZVR/TnD5V6ypFTrASJlfr+fc4OY2da9eg= github.com/robinson/gos7 v0.0.0-20240315073918-1f14519e4846/go.mod h1:AMHIeh1KJ7Xa2RVOMHdv9jXKrpw0D4EWGGQMHLb2doc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= -github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= @@ -2235,7 +2174,6 @@ github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/sensu/sensu-go/api/core/v2 v2.16.0 h1:HOq4rFkQ1S5ZjxmMTLc5J5mAbECrnKWvtXXbMqr3j9s= github.com/sensu/sensu-go/api/core/v2 v2.16.0/go.mod h1:MjM7+MCGEyTAgaZ589SiGHwYiaYF7N/58dU0J070u/0= -github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= github.com/shirou/gopsutil/v4 v4.24.12 h1:qvePBOk20e0IKA1QXrIIU+jmk+zEiYVVx06WjBRlZo4= @@ -2275,14 +2213,12 @@ github.com/sleepinggenius2/gosmi v0.4.4/go.mod h1:l8OniPmd3bJzw0MXP2/qh7AhP/e+bT github.com/smallstep/assert v0.0.0-20200723003110-82e2b9b3b262 h1:unQFBIznI+VYD1/1fApl1A+9VcBk+9dcqGfnePY87LY= github.com/smallstep/assert v0.0.0-20200723003110-82e2b9b3b262/go.mod h1:MyOHs9Po2fbM1LHej6sBUT8ozbxmMOFG+E+rx/GSGuc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= -github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs= github.com/smartystreets/gunit v1.1.3/go.mod h1:EH5qMBab2UclzXUcpR8b93eHsIlp9u+pDQIRp5DZNzQ= github.com/snowflakedb/gosnowflake v1.11.2 h1:eAMsxrCiC6ij5wX3dHx1TQCBOdDmCK062Ir8rndUkRg= github.com/snowflakedb/gosnowflake v1.11.2/go.mod h1:WFe+8mpsapDaQjHX6BqJBKtfQCGlGD3lHKeDsKfpx2A= @@ -2370,13 +2306,8 @@ github.com/tidwall/wal v1.1.8 h1:2qDSGdAdjaY3PEvHRva+9UFqgk+ef7cOiW1Qn5JH1y0= github.com/tidwall/wal v1.1.8/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY= github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro= -github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= -github.com/tj/go-buffer v1.0.1/go.mod h1:iyiJpfFcR2B9sXu7KvjbT9fpM4mOelRSDTbntVj52Uc= -github.com/tj/go-elastic v0.0.0-20171221160941-36157cbbebc2/go.mod h1:WjeM0Oo1eNAjXGDx2yma7uG2XoyRZTq1uv3M/o7imD0= -github.com/tj/go-kinesis v0.0.0-20171128231115-08b17f58cb1b/go.mod h1:/yhzCV0xPfx6jb1bBgRFjl5lytqVqZXEaeqWP8lTEao= -github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= @@ -2448,7 +2379,6 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.7.8 h1:iERMLn0/QJeHFhxSt3p6PeN9mGnvIKSpG9YYorDMnic= github.com/yuin/goldmark v1.7.8/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E= -github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= github.com/yunify/qingstor-sdk-go/v3 v3.2.0 h1:9sB2WZMgjwSUNZhrgvaNGazVltoFUUfuS9f0uCWtTr8= @@ -2558,7 +2488,6 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= -golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -2646,7 +2575,6 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -2908,11 +2836,9 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -3070,7 +2996,6 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= -golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= @@ -3427,7 +3352,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/inputs/kinesis_consumer/README.md b/plugins/inputs/kinesis_consumer/README.md index 985645ba6..86efc1515 100644 --- a/plugins/inputs/kinesis_consumer/README.md +++ b/plugins/inputs/kinesis_consumer/README.md @@ -1,7 +1,10 @@ # Kinesis Consumer Input Plugin -The [Kinesis][kinesis] consumer plugin reads from a Kinesis data stream -and creates metrics using one of the supported [input data formats][]. +This plugin consumes records from [AWS Kinesis][kinesis] data stream and +creates metrics using one of the supported [data formats][data_formats]. + +[kinesis]: https://aws.amazon.com/kinesis/ +[data_formats]: /docs/DATA_FORMATS_INPUT.md ## Service Input @@ -58,9 +61,19 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Kinesis StreamName must exist prior to starting telegraf. streamname = "StreamName" - ## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported) + ## Shard iterator type + ## Available options: 'TRIM_HORIZON' (first in non-expired) and 'LATEST' # shard_iterator_type = "TRIM_HORIZON" + ## Interval for checking for new records + ## Please consider limits for getting records documented here: + ## https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + # poll_interval = "250ms" + + ## Interval for scanning for new shards created when resharding + ## If set to zero, shards are only scanned once on startup. + # shard_update_interval = "30s" + ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to ## outputs before acknowledging them to the original broker to ensure data @@ -73,27 +86,23 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## setting it too low may never flush the broker's messages. # max_undelivered_messages = 1000 - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "influx" - - ## - ## The content encoding of the data from kinesis - ## If you are processing a cloudwatch logs kinesis stream then set this to "gzip" - ## as AWS compresses cloudwatch log data before it is sent to kinesis (aws - ## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding - ## is done automatically by the golang sdk, as data is read from kinesis) - ## + ## Content encoding of the record data + ## If you are processing a cloudwatch logs kinesis stream then set this to + ## "gzip" as AWS compresses cloudwatch log data before it is sent to kinesis. # content_encoding = "identity" - ## Optional - ## Configuration for a dynamodb checkpoint - [inputs.kinesis_consumer.checkpoint_dynamodb] - ## unique name for this consumer - app_name = "default" - table_name = "default" + ## Data format of the records to consume + ## See https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" + + ## Optional: Configuration for DynamoDB backend to store positions in the stream + # [inputs.kinesis_consumer.checkpoint_dynamodb] + # ## Unique name for this consumer + # app_name = "default" + # ## Table to store the sequence numbers in + # table_name = "default" + # ## Interval for persisting data to limit write operations + # # interval = "10s" ``` ### Required AWS IAM permissions @@ -119,9 +128,6 @@ Partition key: namespace Sort key: shard_id ``` -[kinesis]: https://aws.amazon.com/kinesis/ -[input data formats]: /docs/DATA_FORMATS_INPUT.md - ## Metrics ## Example Output diff --git a/plugins/inputs/kinesis_consumer/consumer.go b/plugins/inputs/kinesis_consumer/consumer.go new file mode 100644 index 000000000..f1af7e8d8 --- /dev/null +++ b/plugins/inputs/kinesis_consumer/consumer.go @@ -0,0 +1,355 @@ +package kinesis_consumer + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + + "github.com/influxdata/telegraf" +) + +type recordHandler func(ctx context.Context, shard string, r *types.Record) + +type shardConsumer struct { + seqnr string + interval time.Duration + log telegraf.Logger + + client *kinesis.Client + params *kinesis.GetShardIteratorInput + + onMessage recordHandler +} + +func (c *shardConsumer) consume(ctx context.Context, shard string) ([]types.ChildShard, error) { + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + // Get the first shard iterator + iter, err := c.iterator(ctx) + if err != nil { + return nil, fmt.Errorf("getting first shard iterator failed: %w", err) + } + + for { + // Get new records from the shard + resp, err := c.client.GetRecords(ctx, &kinesis.GetRecordsInput{ + ShardIterator: iter, + }) + if err != nil { + // Handle recoverable errors + var throughputErr *types.ProvisionedThroughputExceededException + var expiredIterErr *types.ExpiredIteratorException + switch { + case errors.As(err, &throughputErr): + // Wait a second before trying again as suggested by + // https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + c.log.Tracef("throughput exceeded when getting records for shard %s...", shard) + time.Sleep(time.Second) + continue + case errors.As(err, &expiredIterErr): + c.log.Tracef("iterator expired for shard %s...", shard) + if iter, err = c.iterator(ctx); err != nil { + return nil, fmt.Errorf("getting shard iterator failed: %w", err) + } + continue + case errors.Is(err, context.Canceled): + return nil, nil + default: + c.log.Tracef("get-records error is of type %T", err) + return nil, fmt.Errorf("getting records failed: %w", err) + } + } + c.log.Tracef("read %d records for shard %s...", len(resp.Records), shard) + + // Check if we fully read the shard + if resp.NextShardIterator == nil { + return resp.ChildShards, nil + } + iter = resp.NextShardIterator + + // Process the records and keep track of the last sequence number + // consumed for recreating the iterator. + for _, r := range resp.Records { + c.onMessage(ctx, shard, &r) + c.seqnr = *r.SequenceNumber + if errors.Is(ctx.Err(), context.Canceled) { + return nil, nil + } + } + + // Wait for the poll interval to pass or cancel + select { + case <-ctx.Done(): + return nil, nil + case <-ticker.C: + continue + } + } +} + +func (c *shardConsumer) iterator(ctx context.Context) (*string, error) { + for { + resp, err := c.client.GetShardIterator(ctx, c.params) + if err != nil { + var throughputErr *types.ProvisionedThroughputExceededException + if errors.As(err, &throughputErr) { + // We called the function too often and should wait a bit + // until trying again + c.log.Tracef("throughput exceeded when getting iterator for shard %s...", *c.params.ShardId) + time.Sleep(time.Second) + continue + } + + return nil, err + } + c.log.Tracef("successfully updated iterator for shard %s (%s)...", *c.params.ShardId, c.seqnr) + return resp.ShardIterator, nil + } +} + +type consumer struct { + config aws.Config + stream string + iterType types.ShardIteratorType + pollInterval time.Duration + shardUpdateInterval time.Duration + log telegraf.Logger + + onMessage recordHandler + position func(shard string) string + + client *kinesis.Client + + shardsConsumed map[string]bool + shardConsumers map[string]*shardConsumer + + wg sync.WaitGroup + + sync.Mutex +} + +func (c *consumer) init() error { + if c.stream == "" { + return errors.New("stream cannot be empty") + } + if c.pollInterval <= 0 { + return errors.New("invalid poll interval") + } + + if c.onMessage == nil { + return errors.New("message handler is undefined") + } + + c.shardsConsumed = make(map[string]bool) + c.shardConsumers = make(map[string]*shardConsumer) + + return nil +} + +func (c *consumer) start(ctx context.Context) { + // Setup the client + c.client = kinesis.NewFromConfig(c.config) + + // Do the initial discovery of shards + if err := c.updateShardConsumers(ctx); err != nil { + c.log.Errorf("Initializing shards failed: %v", err) + } + + // If the consumer has a shard-update interval, use a ticker to update + // available shards on a regular basis + if c.shardUpdateInterval <= 0 { + return + } + ticker := time.NewTicker(c.shardUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := c.updateShardConsumers(ctx); err != nil { + c.log.Errorf("Updating shards failed: %v", err) + } + } + } +} + +func (c *consumer) updateShardConsumers(ctx context.Context) error { + // List all shards of the given stream + var availableShards []types.Shard + req := &kinesis.ListShardsInput{StreamName: aws.String(c.stream)} + for { + resp, err := c.client.ListShards(ctx, req) + if err != nil { + return fmt.Errorf("listing shards failed: %w", err) + } + availableShards = append(availableShards, resp.Shards...) + + if resp.NextToken == nil { + break + } + + req = &kinesis.ListShardsInput{NextToken: resp.NextToken} + } + c.log.Tracef("got %d shards during update", len(availableShards)) + + // All following operations need to be locked to create a consistent + // state of the shards and consumers + c.Lock() + defer c.Unlock() + + // Filter out all shards actively consumed already + inactiveShards := make([]types.Shard, 0, len(availableShards)) + for _, shard := range availableShards { + id := *shard.ShardId + if _, found := c.shardConsumers[id]; found { + c.log.Tracef("shard %s is actively consumed...", id) + continue + } + c.log.Tracef("shard %s is not actively consumed...", id) + inactiveShards = append(inactiveShards, shard) + } + + // Fill the shards already consumed and get the positions if the consumer + // is backed by an iterator store + newShards := make([]types.Shard, 0, len(inactiveShards)) + seqnrs := make(map[string]string, len(inactiveShards)) + for _, shard := range inactiveShards { + id := *shard.ShardId + + if c.shardsConsumed[id] { + c.log.Tracef("shard %s is already fully consumed...", id) + continue + } + c.log.Tracef("shard %s is not fully consumed...", id) + + // Retrieve the shard position from the store + if c.position != nil { + seqnr := c.position(id) + if seqnr == "" { + // A truely new shard + newShards = append(newShards, shard) + c.log.Tracef("shard %s is new...", id) + continue + } + seqnrs[id] = seqnr + + // Check if we already fully consumed for closed shards + end := shard.SequenceNumberRange.EndingSequenceNumber + if end != nil && *end == seqnr { + c.log.Tracef("shard %s is closed and already fully consumed...", id) + c.shardsConsumed[id] = true + continue + } + c.log.Tracef("shard %s is not yet fully consumed...", id) + } + + // The shard is not fully consumed yet so save the sequence number + // and the shard as "new". + newShards = append(newShards, shard) + } + + // Filter all shards already fully consumed and create a new consumer for + // every remaining new shard respecting resharding artifacts + for _, shard := range newShards { + id := *shard.ShardId + + // Handle resharding by making sure all parents are consumed already + // before starting a consumer on a child shard. If parents are not + // consumed fully we ignore this shard here as it will be reported + // by the call to `GetRecords` as a child later. + if shard.ParentShardId != nil && *shard.ParentShardId != "" { + pid := *shard.ParentShardId + if !c.shardsConsumed[pid] { + c.log.Tracef("shard %s has parent %s which is not fully consumed yet...", id, pid) + continue + } + } + if shard.AdjacentParentShardId != nil && *shard.AdjacentParentShardId != "" { + pid := *shard.AdjacentParentShardId + if !c.shardsConsumed[pid] { + c.log.Tracef("shard %s has adjacent parent %s which is not fully consumed yet...", id, pid) + continue + } + } + + // Create a new consumer and start it + c.wg.Add(1) + go func(shardID string) { + defer c.wg.Done() + c.startShardConsumer(ctx, shardID, seqnrs[shardID]) + }(id) + } + + return nil +} + +func (c *consumer) startShardConsumer(ctx context.Context, id, seqnr string) { + c.log.Tracef("starting consumer for shard %s at sequence number %q...", id, seqnr) + sc := &shardConsumer{ + seqnr: seqnr, + interval: c.pollInterval, + log: c.log, + onMessage: c.onMessage, + client: c.client, + params: &kinesis.GetShardIteratorInput{ + ShardId: &id, + ShardIteratorType: c.iterType, + StreamName: &c.stream, + }, + } + if seqnr != "" { + sc.params.ShardIteratorType = types.ShardIteratorTypeAfterSequenceNumber + sc.params.StartingSequenceNumber = &seqnr + } + c.shardConsumers[id] = sc + + childs, err := sc.consume(ctx, id) + if err != nil { + c.log.Errorf("Consuming shard %s failed: %v", id, err) + return + } + c.log.Tracef("finished consuming shard %s", id) + + c.Lock() + defer c.Unlock() + + c.shardsConsumed[id] = true + delete(c.shardConsumers, id) + + for _, shard := range childs { + cid := *shard.ShardId + + startable := true + for _, pid := range shard.ParentShards { + startable = startable && c.shardsConsumed[pid] + } + if !startable { + c.log.Tracef("child shard %s of shard %s is not startable as parents are fully consumed yet...", cid, id) + continue + } + c.log.Tracef("child shard %s of shard %s is startable...", cid, id) + + var cseqnr string + if c.position != nil { + cseqnr = c.position(cid) + } + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.startShardConsumer(ctx, cid, cseqnr) + }() + } +} + +func (c *consumer) stop() { + c.wg.Wait() +} diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer.go b/plugins/inputs/kinesis_consumer/kinesis_consumer.go index 4c65aadef..87b272e58 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer.go @@ -5,16 +5,15 @@ import ( "context" _ "embed" "errors" + "fmt" "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/kinesis" - consumer "github.com/harlow/kinesis-consumer" - "github.com/harlow/kinesis-consumer/store/ddb" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" common_aws "github.com/influxdata/telegraf/plugins/common/aws" "github.com/influxdata/telegraf/plugins/inputs" @@ -28,44 +27,46 @@ var once sync.Once type KinesisConsumer struct { StreamName string `toml:"streamname"` ShardIteratorType string `toml:"shard_iterator_type"` + PollInterval config.Duration `toml:"poll_interval"` + ShardUpdateInterval config.Duration `toml:"shard_update_interval"` DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` ContentEncoding string `toml:"content_encoding"` Log telegraf.Logger `toml:"-"` common_aws.CredentialConfig - cons *consumer.Consumer - parser telegraf.Parser - cancel context.CancelFunc acc telegraf.TrackingAccumulator - sem chan struct{} + parser telegraf.Parser - checkpoint consumer.Store - checkpoints map[string]checkpoint - records map[telegraf.TrackingID]string - checkpointTex sync.Mutex - recordsTex sync.Mutex - wg sync.WaitGroup + cfg aws.Config + consumer *consumer + cancel context.CancelFunc + sem chan struct{} + + iteratorStore *store + + records map[telegraf.TrackingID]iterator + recordsTex sync.Mutex + + wg sync.WaitGroup contentDecodingFunc decodingFunc - - lastSeqNum string } type dynamoDB struct { - AppName string `toml:"app_name"` - TableName string `toml:"table_name"` -} - -type checkpoint struct { - streamName string - shardID string + AppName string `toml:"app_name"` + TableName string `toml:"table_name"` + Interval config.Duration `toml:"interval"` } func (*KinesisConsumer) SampleConfig() string { return sampleConfig } +func (k *KinesisConsumer) SetParser(parser telegraf.Parser) { + k.parser = parser +} + func (k *KinesisConsumer) Init() error { // Set defaults if k.MaxUndeliveredMessages < 1 { @@ -79,140 +80,132 @@ func (k *KinesisConsumer) Init() error { k.ContentEncoding = "identity" } + // Check input params + if k.StreamName == "" { + return errors.New("stream name cannot be empty") + } + f, err := getDecodingFunc(k.ContentEncoding) if err != nil { return err } k.contentDecodingFunc = f - return nil -} - -func (k *KinesisConsumer) SetParser(parser telegraf.Parser) { - k.parser = parser -} - -func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error { - return k.connect(acc) -} - -func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error { - if k.cons == nil { - return k.connect(acc) - } - // Enforce writing of last received sequence number - k.lastSeqNum = "" - - return nil -} - -func (k *KinesisConsumer) Stop() { - k.cancel() - k.wg.Wait() -} - -// GetCheckpoint wraps the checkpoint's GetCheckpoint function (called by consumer library) -func (k *KinesisConsumer) GetCheckpoint(streamName, shardID string) (string, error) { - return k.checkpoint.GetCheckpoint(streamName, shardID) -} - -// SetCheckpoint wraps the checkpoint's SetCheckpoint function (called by consumer library) -func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber string) error { - if sequenceNumber == "" { - return errors.New("sequence number should not be empty") + if k.DynamoDB != nil { + if k.DynamoDB.Interval <= 0 { + k.DynamoDB.Interval = config.Duration(10 * time.Second) + } + k.iteratorStore = newStore(k.DynamoDB.AppName, k.DynamoDB.TableName, time.Duration(k.DynamoDB.Interval), k.Log) } - k.checkpointTex.Lock() - k.checkpoints[sequenceNumber] = checkpoint{streamName: streamName, shardID: shardID} - k.checkpointTex.Unlock() + k.records = make(map[telegraf.TrackingID]iterator, k.MaxUndeliveredMessages) + k.sem = make(chan struct{}, k.MaxUndeliveredMessages) - return nil -} - -func (k *KinesisConsumer) connect(acc telegraf.Accumulator) error { + // Setup the client to connect to the Kinesis service cfg, err := k.CredentialConfig.Credentials() if err != nil { return err } - if k.EndpointURL != "" { cfg.BaseEndpoint = &k.EndpointURL } + if k.Log.Level().Includes(telegraf.Trace) { + logWrapper := &telegrafLoggerWrapper{k.Log} + cfg.Logger = logWrapper + cfg.ClientLogMode = aws.LogRetries + } + k.cfg = cfg - logWrapper := &telegrafLoggerWrapper{k.Log} - cfg.Logger = logWrapper - cfg.ClientLogMode = aws.LogRetries - client := kinesis.NewFromConfig(cfg) + return nil +} - k.checkpoint = &noopStore{} - if k.DynamoDB != nil { - var err error - k.checkpoint, err = ddb.New( - k.DynamoDB.AppName, - k.DynamoDB.TableName, - ddb.WithDynamoClient(dynamodb.NewFromConfig(cfg)), - ddb.WithMaxInterval(time.Second*10), - ) - if err != nil { - return err +func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error { + k.acc = acc.WithTracking(k.MaxUndeliveredMessages) + + // Start the store if necessary + if k.iteratorStore != nil { + if err := k.iteratorStore.run(context.Background()); err != nil { + return fmt.Errorf("starting DynamoDB store failed: %w", err) } } - cons, err := consumer.New( - k.StreamName, - consumer.WithClient(client), - consumer.WithShardIteratorType(k.ShardIteratorType), - consumer.WithStore(k), - consumer.WithLogger(logWrapper), - ) - if err != nil { - return err - } - - k.cons = cons - - k.acc = acc.WithTracking(k.MaxUndeliveredMessages) - k.records = make(map[telegraf.TrackingID]string, k.MaxUndeliveredMessages) - k.checkpoints = make(map[string]checkpoint, k.MaxUndeliveredMessages) - k.sem = make(chan struct{}, k.MaxUndeliveredMessages) - ctx := context.Background() ctx, k.cancel = context.WithCancel(ctx) + // Setup the consumer + k.consumer = &consumer{ + config: k.cfg, + stream: k.StreamName, + iterType: types.ShardIteratorType(k.ShardIteratorType), + pollInterval: time.Duration(k.PollInterval), + shardUpdateInterval: time.Duration(k.ShardUpdateInterval), + log: k.Log, + onMessage: func(ctx context.Context, shard string, r *types.Record) { + // Checking for number of messages in flight and wait for a free + // slot in case there are too many + select { + case <-ctx.Done(): + return + case k.sem <- struct{}{}: + break + } + + if err := k.onMessage(k.acc, shard, r); err != nil { + seqnr := *r.SequenceNumber + k.Log.Errorf("Processing message with sequence number %q in shard %s failed: %v", seqnr, shard, err) + <-k.sem + } + }, + } + + // Link in the backing iterator store + if k.iteratorStore != nil { + k.consumer.position = func(shard string) string { + seqnr, err := k.iteratorStore.get(ctx, k.StreamName, shard) + if err != nil && !errors.Is(err, errNotFound) { + k.Log.Errorf("retrieving sequence number for shard %q failed: %s", shard, err) + } + + return seqnr + } + } + if err := k.consumer.init(); err != nil { + return fmt.Errorf("initializing consumer failed: %w", err) + } + + // Start the go-routine handling metrics delivered to the output k.wg.Add(1) go func() { defer k.wg.Done() k.onDelivery(ctx) }() + // Start the go-routine handling message consumption k.wg.Add(1) go func() { defer k.wg.Done() - err := k.cons.Scan(ctx, func(r *consumer.Record) error { - select { - case <-ctx.Done(): - return ctx.Err() - case k.sem <- struct{}{}: - break - } - if err := k.onMessage(k.acc, r); err != nil { - <-k.sem - k.Log.Errorf("Scan parser error: %v", err) - } - - return nil - }) - if err != nil { - k.cancel() - k.Log.Errorf("Scan encountered an error: %v", err) - k.cons = nil - } + k.consumer.start(ctx) }() return nil } -func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error { +func (*KinesisConsumer) Gather(telegraf.Accumulator) error { + return nil +} + +func (k *KinesisConsumer) Stop() { + k.cancel() + k.wg.Wait() + k.consumer.stop() + + if k.iteratorStore != nil { + k.iteratorStore.stop() + } +} + +// onMessage is called for new messages consumed from Kinesis +func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, shard string, r *types.Record) error { data, err := k.contentDecodingFunc(r.Data) if err != nil { return err @@ -228,61 +221,58 @@ func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consume }) } + seqnr := *r.SequenceNumber + k.recordsTex.Lock() + defer k.recordsTex.Unlock() + id := acc.AddTrackingMetricGroup(metrics) - k.records[id] = *r.SequenceNumber - k.recordsTex.Unlock() + k.records[id] = iterator{shard: shard, seqnr: seqnr} return nil } +// onDelivery is called for every metric successfully delivered to the outputs func (k *KinesisConsumer) onDelivery(ctx context.Context) { for { select { case <-ctx.Done(): return case info := <-k.acc.Delivered(): - k.recordsTex.Lock() - sequenceNum, ok := k.records[info.ID()] - if !ok { - k.recordsTex.Unlock() - continue + // Store the metric iterator in DynamoDB if configured + if k.iteratorStore != nil { + k.storeDelivered(info.ID()) } + + // Reduce the number of undelivered messages by reading from the channel <-k.sem - delete(k.records, info.ID()) - k.recordsTex.Unlock() - - if !info.Delivered() { - k.Log.Debug("Metric group failed to process") - continue - } - - if k.lastSeqNum != "" { - continue - } - - // Store the sequence number at least once per gather cycle using the checkpoint - // storage (usually DynamoDB). - k.checkpointTex.Lock() - chk, ok := k.checkpoints[sequenceNum] - if !ok { - k.checkpointTex.Unlock() - continue - } - delete(k.checkpoints, sequenceNum) - k.checkpointTex.Unlock() - - k.Log.Tracef("persisting sequence number %q for stream %q and shard %q", sequenceNum) - k.lastSeqNum = sequenceNum - if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil { - k.Log.Errorf("Setting checkpoint failed: %v", err) - } } } } +func (k *KinesisConsumer) storeDelivered(id telegraf.TrackingID) { + k.recordsTex.Lock() + defer k.recordsTex.Unlock() + + // Find the iterator belonging to the delivered message + iter, ok := k.records[id] + if !ok { + k.Log.Debugf("No iterator found for delivered metric %v!", id) + return + } + + // Remove metric + delete(k.records, id) + + // Store the iterator in the database + k.iteratorStore.set(k.StreamName, iter.shard, iter.seqnr) +} + func init() { inputs.Add("kinesis_consumer", func() telegraf.Input { - return &KinesisConsumer{} + return &KinesisConsumer{ + PollInterval: config.Duration(250 * time.Millisecond), + ShardUpdateInterval: config.Duration(30 * time.Second), + } }) } diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go index b48372571..c9574750e 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go @@ -6,7 +6,6 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - consumer "github.com/harlow/kinesis-consumer" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -16,6 +15,7 @@ import ( func TestInvalidCoding(t *testing.T) { plugin := &KinesisConsumer{ + StreamName: "foo", ContentEncoding: "notsupported", } require.ErrorContains(t, plugin.Init(), "unknown content encoding") @@ -62,32 +62,25 @@ func TestOnMessage(t *testing.T) { tests := []struct { name string encoding string - records map[telegraf.TrackingID]string - args *consumer.Record + record *types.Record expectedNumber int expectedContent string }{ { name: "test no compression", encoding: "none", - records: make(map[telegraf.TrackingID]string), - args: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + record: &types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, expectedNumber: 2, expectedContent: "bob", }, { - name: "test no compression via empty string for ContentEncoding", - records: make(map[telegraf.TrackingID]string), - args: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + name: "test no compression via empty string for ContentEncoding", + record: &types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, expectedNumber: 2, expectedContent: "bob", @@ -95,24 +88,18 @@ func TestOnMessage(t *testing.T) { { name: "test no compression via identity ContentEncoding", encoding: "identity", - records: make(map[telegraf.TrackingID]string), - args: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + record: &types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, expectedNumber: 2, expectedContent: "bob", }, { - name: "test no compression via no ContentEncoding", - records: make(map[telegraf.TrackingID]string), - args: &consumer.Record{ - Record: types.Record{ - Data: notZippedBytes, - SequenceNumber: aws.String("anything"), - }, + name: "test no compression via no ContentEncoding", + record: &types.Record{ + Data: notZippedBytes, + SequenceNumber: aws.String("anything"), }, expectedNumber: 2, expectedContent: "bob", @@ -120,12 +107,9 @@ func TestOnMessage(t *testing.T) { { name: "test gzip compression", encoding: "gzip", - records: make(map[telegraf.TrackingID]string), - args: &consumer.Record{ - Record: types.Record{ - Data: gzippedBytes, - SequenceNumber: aws.String("anything"), - }, + record: &types.Record{ + Data: gzippedBytes, + SequenceNumber: aws.String("anything"), }, expectedNumber: 1, expectedContent: "bob", @@ -133,12 +117,9 @@ func TestOnMessage(t *testing.T) { { name: "test zlib compression", encoding: "zlib", - records: make(map[telegraf.TrackingID]string), - args: &consumer.Record{ - Record: types.Record{ - Data: zlibBytpes, - SequenceNumber: aws.String("anything"), - }, + record: &types.Record{ + Data: zlibBytpes, + SequenceNumber: aws.String("anything"), }, expectedNumber: 1, expectedContent: "bob", @@ -157,14 +138,16 @@ func TestOnMessage(t *testing.T) { // Setup plugin plugin := &KinesisConsumer{ + StreamName: "foo", ContentEncoding: tt.encoding, + Log: &testutil.Logger{}, parser: parser, - records: tt.records, + records: make(map[telegraf.TrackingID]iterator), } require.NoError(t, plugin.Init()) var acc testutil.Accumulator - require.NoError(t, plugin.onMessage(acc.WithTracking(tt.expectedNumber), tt.args)) + require.NoError(t, plugin.onMessage(acc.WithTracking(tt.expectedNumber), "test", tt.record)) actual := acc.GetTelegrafMetrics() require.Len(t, actual, tt.expectedNumber) diff --git a/plugins/inputs/kinesis_consumer/noop_store.go b/plugins/inputs/kinesis_consumer/noop_store.go deleted file mode 100644 index f400fdc71..000000000 --- a/plugins/inputs/kinesis_consumer/noop_store.go +++ /dev/null @@ -1,7 +0,0 @@ -package kinesis_consumer - -// noopStore implements the storage interface with discard -type noopStore struct{} - -func (noopStore) SetCheckpoint(_, _, _ string) error { return nil } -func (noopStore) GetCheckpoint(_, _ string) (string, error) { return "", nil } diff --git a/plugins/inputs/kinesis_consumer/sample.conf b/plugins/inputs/kinesis_consumer/sample.conf index afc1e6c45..c207caf06 100644 --- a/plugins/inputs/kinesis_consumer/sample.conf +++ b/plugins/inputs/kinesis_consumer/sample.conf @@ -30,9 +30,19 @@ ## Kinesis StreamName must exist prior to starting telegraf. streamname = "StreamName" - ## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported) + ## Shard iterator type + ## Available options: 'TRIM_HORIZON' (first in non-expired) and 'LATEST' # shard_iterator_type = "TRIM_HORIZON" + ## Interval for checking for new records + ## Please consider limits for getting records documented here: + ## https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + # poll_interval = "250ms" + + ## Interval for scanning for new shards created when resharding + ## If set to zero, shards are only scanned once on startup. + # shard_update_interval = "30s" + ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to ## outputs before acknowledging them to the original broker to ensure data @@ -45,24 +55,20 @@ ## setting it too low may never flush the broker's messages. # max_undelivered_messages = 1000 - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "influx" - - ## - ## The content encoding of the data from kinesis - ## If you are processing a cloudwatch logs kinesis stream then set this to "gzip" - ## as AWS compresses cloudwatch log data before it is sent to kinesis (aws - ## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding - ## is done automatically by the golang sdk, as data is read from kinesis) - ## + ## Content encoding of the record data + ## If you are processing a cloudwatch logs kinesis stream then set this to + ## "gzip" as AWS compresses cloudwatch log data before it is sent to kinesis. # content_encoding = "identity" - ## Optional - ## Configuration for a dynamodb checkpoint - [inputs.kinesis_consumer.checkpoint_dynamodb] - ## unique name for this consumer - app_name = "default" - table_name = "default" + ## Data format of the records to consume + ## See https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" + + ## Optional: Configuration for DynamoDB backend to store positions in the stream + # [inputs.kinesis_consumer.checkpoint_dynamodb] + # ## Unique name for this consumer + # app_name = "default" + # ## Table to store the sequence numbers in + # table_name = "default" + # ## Interval for persisting data to limit write operations + # # interval = "10s" diff --git a/plugins/inputs/kinesis_consumer/store.go b/plugins/inputs/kinesis_consumer/store.go new file mode 100644 index 000000000..e7a73bf38 --- /dev/null +++ b/plugins/inputs/kinesis_consumer/store.go @@ -0,0 +1,179 @@ +package kinesis_consumer + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + + "github.com/influxdata/telegraf" +) + +var errNotFound = errors.New("no iterator found") + +type iterator struct { + stream string + shard string + seqnr string + modified bool +} + +type store struct { + app string + table string + interval time.Duration + log telegraf.Logger + + client *dynamodb.Client + iterators map[string]iterator + + wg sync.WaitGroup + cancel context.CancelFunc + + sync.Mutex +} + +func newStore(app, table string, interval time.Duration, log telegraf.Logger) *store { + s := &store{ + app: app, + table: table, + interval: interval, + log: log, + } + + // Initialize the iterator states + s.iterators = make(map[string]iterator) + + return s +} + +func (s *store) run(ctx context.Context) error { + rctx, cancel := context.WithCancel(ctx) + s.cancel = cancel + + // Create a client to connect to DynamoDB + cfg, err := config.LoadDefaultConfig(rctx) + if err != nil { + return fmt.Errorf("loading default config failed: %w", err) + } + s.client = dynamodb.NewFromConfig(cfg) + + // Start the go-routine that pushes the states out to DynamoDB on a + // regular interval + s.wg.Add(1) + go func() { + defer s.wg.Done() + + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-rctx.Done(): + return + case <-ticker.C: + s.write(rctx) + } + } + }() + + return nil +} + +func (s *store) stop() { + ctx, cancel := context.WithTimeout(context.Background(), s.interval) + defer cancel() + s.write(ctx) + + s.cancel() + s.wg.Wait() +} + +func (s *store) write(ctx context.Context) { + s.Lock() + defer s.Unlock() + + for k, iter := range s.iterators { + // Only write iterators modified since the last write + if !iter.modified { + continue + } + + if _, err := s.client.PutItem( + ctx, + &dynamodb.PutItemInput{ + TableName: aws.String(s.table), + Item: map[string]types.AttributeValue{ + "namespace": &types.AttributeValueMemberS{Value: s.app + "-" + iter.stream}, + "shard_id": &types.AttributeValueMemberS{Value: iter.shard}, + "sequence_number": &types.AttributeValueMemberS{Value: iter.seqnr}, + }, + }); err != nil { + s.log.Errorf("storing iterator %s-%s/%s/%s failed: %v", s.app, iter.stream, iter.shard, iter.seqnr, err) + } + + // Mark state as saved + iter.modified = false + s.iterators[k] = iter + } +} + +func (s *store) set(stream, shard, seqnr string) { + s.Lock() + defer s.Unlock() + + s.iterators[stream+"/"+shard] = iterator{ + stream: stream, + shard: shard, + seqnr: seqnr, + modified: true, + } +} + +func (s *store) get(ctx context.Context, stream, shard string) (string, error) { + s.Lock() + defer s.Unlock() + + // Return the cached result if possible + if iter, found := s.iterators[stream+"/"+shard]; found { + return iter.seqnr, nil + } + + // Retrieve the information from the database + resp, err := s.client.GetItem(ctx, &dynamodb.GetItemInput{ + TableName: aws.String(s.table), + ConsistentRead: aws.Bool(true), + Key: map[string]types.AttributeValue{ + "namespace": &types.AttributeValueMemberS{Value: s.app + "-" + stream}, + "shard_id": &types.AttributeValueMemberS{Value: shard}, + }, + }) + if err != nil { + return "", err + } + + // Extract the sequence number + raw, found := resp.Item["sequence_number"] + if !found { + return "", fmt.Errorf("%w for %s-%s/%s", errNotFound, s.app, stream, shard) + } + seqnr, ok := raw.(*types.AttributeValueMemberS) + if !ok { + return "", fmt.Errorf("sequence number for %s-%s/%s is of unexpected type %T", s.app, stream, shard, raw) + } + + // Fill the cache + s.iterators[stream+"/"+shard] = iterator{ + stream: stream, + shard: shard, + seqnr: seqnr.Value, + } + + return seqnr.Value, nil +}