chore(inputs.kinesis_consumer): Replace consumer library by own implementation (#16332)

This commit is contained in:
Sven Rebhan 2025-03-05 16:24:24 +01:00 committed by GitHub
parent 65d09d89fa
commit f636016cb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 762 additions and 334 deletions

View File

@ -69,7 +69,6 @@ following works:
- github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/aws/protocol/eventstream/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/config [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/config/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/credentials [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/credentials/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/dynamodb/attributevalue/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/feature/ec2/imds [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/ec2/imds/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/feature/s3/manager [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/s3/manager/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/internal/configsources [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/internal/configsources/LICENSE.txt)
@ -79,7 +78,6 @@ following works:
- github.com/aws/aws-sdk-go-v2/service/cloudwatch [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/cloudwatch/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/cloudwatchlogs/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/dynamodb [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/dynamodb/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/dynamodbstreams [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/dynamodbstreams/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/ec2 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/ec2/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/accept-encoding/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/internal/checksum [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/checksum/LICENSE.txt)
@ -93,7 +91,6 @@ following works:
- github.com/aws/aws-sdk-go-v2/service/sts [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/sts/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/timestreamwrite [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/timestreamwrite/LICENSE.txt)
- github.com/aws/smithy-go [Apache License 2.0](https://github.com/aws/smithy-go/blob/main/LICENSE)
- github.com/awslabs/kinesis-aggregation/go [Apache License 2.0](https://github.com/awslabs/kinesis-aggregation/blob/master/LICENSE.txt)
- github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE)
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
- github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE)
@ -203,7 +200,6 @@ following works:
- github.com/gsterjov/go-libsecret [MIT License](https://github.com/gsterjov/go-libsecret/blob/master/LICENSE)
- github.com/gwos/tcg/sdk [MIT License](https://github.com/gwos/tcg/blob/master/LICENSE)
- github.com/hailocab/go-hostpool [MIT License](https://github.com/hailocab/go-hostpool/blob/master/LICENSE)
- github.com/harlow/kinesis-consumer [MIT License](https://github.com/harlow/kinesis-consumer/blob/master/LICENSE)
- github.com/hashicorp/consul/api [Mozilla Public License 2.0](https://github.com/hashicorp/consul/blob/main/api/LICENSE)
- github.com/hashicorp/errwrap [Mozilla Public License 2.0](https://github.com/hashicorp/errwrap/blob/master/LICENSE)
- github.com/hashicorp/go-cleanhttp [Mozilla Public License 2.0](https://github.com/hashicorp/go-cleanhttp/blob/master/LICENSE)

4
go.mod
View File

@ -111,7 +111,6 @@ require (
github.com/gosnmp/gosnmp v1.38.0
github.com/grid-x/modbus v0.0.0-20240503115206-582f2ab60a18
github.com/gwos/tcg/sdk v0.0.0-20240830123415-f8a34bba6358
github.com/harlow/kinesis-consumer v0.3.6-0.20240916192723-43900507c911
github.com/hashicorp/consul/api v1.29.2
github.com/hashicorp/go-uuid v1.0.3
github.com/hashicorp/golang-lru/v2 v2.0.7
@ -286,13 +285,11 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/awnumar/memcall v0.3.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.7 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13 // indirect
@ -301,7 +298,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect

76
go.sum
View File

@ -733,7 +733,6 @@ github.com/ClickHouse/ch-go v0.64.1 h1:FWpP+QU4KchgzpEekuv8YoI/fUc4H2r6Bwc5Wwrzv
github.com/ClickHouse/ch-go v0.64.1/go.mod h1:RBUynvczWwVzhS6Up9lPKlH1mrk4UAmle6uzCiW4Pkc=
github.com/ClickHouse/clickhouse-go/v2 v2.30.3 h1:m0VZqUNCJ7lOmZfmOE3HZUMixZHftKmZLqcrz2+UVHk=
github.com/ClickHouse/clickhouse-go/v2 v2.30.3/go.mod h1:V1aZaG0ctMbd8KVi+D4loXi97duWYtHiQHMCgipKJcI=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
@ -820,8 +819,6 @@ github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrI
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs=
github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI=
github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs=
github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI=
github.com/aliyun/alibaba-cloud-sdk-go v1.62.721 h1:OwLOwY8UfcuwE2eoKA2CxNewpUQv8Qnmpf7UcYNihvk=
@ -856,10 +853,6 @@ github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY=
github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo=
github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE=
github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys=
github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc h1:LoL75er+LKDHDUfU5tRvFwxH0LjPpZN8OoG8Ll+liGU=
github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc/go.mod h1:w648aMHEgFYS6xb0KVMMtZ2uMeemhiKCuD2vj6gY52A=
github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 h1:Bmjk+DjIi3tTAU0wxGaFbfjGUqlxxSXARq9A96Kgoos=
@ -878,43 +871,30 @@ github.com/awnumar/memcall v0.3.0 h1:8b/3Sptrtgejj2kLgL6M5F2r4OzTf19CTllO+gIXUg8
github.com/awnumar/memcall v0.3.0/go.mod h1:8xOx1YbfyuCg3Fy6TO8DK0kZUua3V42/goA5Ru47E8w=
github.com/awnumar/memguard v0.22.5 h1:PH7sbUVERS5DdXh3+mLo8FDcl1eIeVjJVYMnyuYpvuI=
github.com/awnumar/memguard v0.22.5/go.mod h1:+APmZGThMBWjnMlKiSM1X7MVpbIVewen2MTkqWkA/zE=
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg=
github.com/aws/aws-sdk-go v1.44.263/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.8.1/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.36.2 h1:Ub6I4lq/71+tPb/atswvToaLGVMxKZvjYDVOWEExOcU=
github.com/aws/aws-sdk-go-v2 v1.36.2/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc=
github.com/aws/aws-sdk-go-v2/config v1.6.1/go.mod h1:t/y3UPu0XEDy0cEw6mvygaBQaPzWiYAxfP2SzgtvclA=
github.com/aws/aws-sdk-go-v2/config v1.18.25/go.mod h1:dZnYpD5wTW/dQF0rRNLVypB396zWCcPiBIvdvSWHEg4=
github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9Q5CsPShmyo=
github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko=
github.com/aws/aws-sdk-go-v2/credentials v1.3.3/go.mod h1:oVieKMT3m9BSfqhOfuQ+E0j/yN84ZAJ7Qv8Sfume/ak=
github.com/aws/aws-sdk-go-v2/credentials v1.13.24/go.mod h1:jYPYi99wUOPIFi0rhiOvXeSEReVOzBqFNOX5bXYoG2o=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.2.0/go.mod h1:UVFtSYSWCHj2+brBLDHUdlJXmz8LxUpZhA+Ewypc+xQ=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.7 h1:FZB15YK2h/l2wO9YXvXr7/mZ5uOJIsLNZIePlHarAwg=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.7/go.mod h1:xTMr0gSUW6H6nJJVV257wWlk9257DwZ7EFhPFn3itgo=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1/go.mod h1:+GTydg3uHmVlQdkRoetz6VHKbOMEYof70m19IpMLifc=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.3/go.mod h1:4Q0UFP0YJf0NrsEuEYHpM9fTSEVnD16Z3uyEF7J9JGM=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 h1:zeN9UtUlA6FTx0vFSayxSX32HDw73Yb6Hh2izDSFxXY=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10/go.mod h1:3HKuexPDcwLWPaqpW2UR/9n8N/u/3CKcGAzSs8p8u8g=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.0.4/go.mod h1:W5gGbtNXFpF9/ssYZTaItzG/B+j0bjTnwStiCP2AtWU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.33 h1:knLyPMw3r3JsU8MFHWctE4/e2qWbPaxDYLlohPvnY8c=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.33/go.mod h1:EBp2HQ3f+XCB+5J+IoEbGhoV7CpJbnrsd4asNXmTL0A=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.33 h1:K0+Ne08zqti8J9jwENxZ5NoUyBnaFDTu3apwQJWrwwA=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.33/go.mod h1:K97stwwzaWzmqxO8yLGHhClbVW1tC6VT1pDLk1pGrq4=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1/go.mod h1:Pv3WenDjI0v2Jl7UaMFIIbPOBbhn33RmmAmGgkXDoqY=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
@ -924,55 +904,39 @@ github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.14 h1:RdaxtOI+W9CqnFDLXkoF
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.14/go.mod h1:fwajvO52Dn+DVxtXQJeGLfnNq+Qm+Pul56XtOKCyN00=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.45.3 h1:va7zt8/kkg5zR0TX2r7wCXssdZ4+blRxbsA6IS9XXYI=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.45.3/go.mod h1:CijDCaRp5sH8QM0LqImyzy5roG8cOtgp2Abj0V/4luk=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.5.0/go.mod h1:XY5YhCS9SLul3JSQ08XG/nfxXxrkh6RR21XPq/J//NY=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.40.0 h1:OoQO3OUzwhNGNyTLsNe0Scre8QxHtZZn/7yY96K/PNI=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.40.0/go.mod h1:FcMiR2AALpkrpik6JzbYu+iEfktzrs3XOq5Shk9nvik=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.4.0/go.mod h1:bYsEP8w5YnbYyrx/Zi5hy4hTwRRQISSJS3RWrsGRijg=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.1 h1:kZR1TZ0VYcRK2LFiFt61EReplssCq9SZO4gVSYV1Aww=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.20.1/go.mod h1:ifHRXsCyLVIdvDaAScQnM7jtsXtoBZFmyZiLMex8FTA=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.203.1 h1:ZgY9zeVAe+54Qa7o1GXKRNTez79lffCeJSSinhl+qec=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.203.1/go.mod h1:0naMk66LtdeTmE+1CWQTKwtzOQ2t8mavOhMhR0Pv1m0=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.3.0/go.mod h1:v8ygadNyATSm6elwJ/4gzJwcFhri9RqS8skgHKiwXPU=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 h1:YPYe6ZmvUfDDDELqEKtAd6bo8zxhkm+XEFEzQisqUIE=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17/go.mod h1:oBtcnYua/CgzCWYN7NZ5j7PotFDaFSUjCYVTtfyn7vw=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.1.0/go.mod h1:enkU5tq2HoXY+ZMiQprgF3Q83T3PbO77E83yXXzRZWE=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13 h1:eWoHfLIzYeUtJEuoUmD5PwTE+fLaIPN9NZ7UXd9CW0s=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.13/go.mod h1:x5t8Ve0J7JK9VHKSPSRAdBrWAgr/5hH3UeCFMLoyUGQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3/go.mod h1:7gcsONBmFoCcKrAqrm95trrMd2+C/ReYKP7Vfu8yHHA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27/go.mod h1:EOwBD4J4S5qYszS5/3DpkejfuK+Z5/1uzICfPaZLtqw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.14 h1:2scbY6//jy/s8+5vGrk7l1+UtHl0h9A4MjOO2k/TM2E=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.14/go.mod h1:bRpZPHZpSe5YRHmPfK3h1M7UBFCn2szHzyx0rw04zro=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 h1:246A4lSTXWJw/rmlQI+TT2OcqeDMKBdyjEQrafMaQdA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15/go.mod h1:haVfg3761/WF7YPuJOER2MP0k4UAXyHaLclKXB6usDg=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.6 h1:yN7WEx9ksiP5+9zdKtoQYrUT51HvYw+EA1TXsElvMyk=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.32.6/go.mod h1:j8MNat6qtGw5OoEACRbWtT8r5my4nRWfM/6Uk+NsuC4=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 h1:hT8ZAZRIfqBqHbzKTII+CIiY8G2oC9OpLedkZ51DWl8=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3/go.mod h1:Lcxzg5rojyVPU/0eFwLtcyTaek/6Mtic5B1gJo7e/zE=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.3/go.mod h1:Jgw5O+SK7MZ2Yi9Yvzb4PggAPYaFSliiQuWR0hNjexk=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10/go.mod h1:ouy2P4z6sJN70fR3ka3wD3Ro3KezSxU6eKGQI2+2fjI=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 h1:rLnYAfXQ3YAccocshIH5mzNNwZBkBo+bP6EhIxak6Hw=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10/go.mod h1:AFvkxc8xfBe8XA+5St5XIHHrQQtkxqrRincx4hmMHOk=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 h1:JnhTZR3PiYDNKlXy50/pNeix9aGMo6lLpXwJ1mw8MD4=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.2/go.mod h1:RBhoMJB8yFToaCnbe0jNq5Dcdy0jp6LhHqg55rjClkM=
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0/go.mod h1:BgQOMsg8av8jset59jelyPW7NoZcZXLVpDsXunGDrk8=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.12 h1:fqg6c1KVrc3SYWma/egWue5rKI4G2+M4wMQN2JosNAA=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.12/go.mod h1:7Yn+p66q/jt38qMoVfNvjbm3D89mGBnkwDcijgtih8w=
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.27.4 h1:glNNLfVzW88jz83oPZ4gXndJL7VDDANHowCoJU673OU=
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.27.4/go.mod h1:VUHrcV1XoUd6ZWzIMal9CeAA2EiKkAhmImuRGhNbaxg=
github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@ -1268,7 +1232,6 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
@ -1288,7 +1251,6 @@ github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOr
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
github.com/go-resty/resty/v2 v2.13.1 h1:x+LHXBI2nMB1vqndymf26quycC4aggYJ7DECYbiz03g=
github.com/go-resty/resty/v2 v2.13.1/go.mod h1:GznXlLxkq6Nh4sU59rPmUw3VtgpO3aS96ORAI6Q7d+0=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
@ -1387,7 +1349,6 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
@ -1449,7 +1410,6 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
@ -1459,7 +1419,6 @@ github.com/google/protobuf v3.11.4+incompatible/go.mod h1:lUQ9D1ePzbH2PrIS7ob/bj
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
@ -1530,8 +1489,6 @@ github.com/gwos/tcg/sdk v0.0.0-20240830123415-f8a34bba6358 h1:QmKzhYk6KMjUutu9Sy
github.com/gwos/tcg/sdk v0.0.0-20240830123415-f8a34bba6358/go.mod h1:h40FJV0HuULqXSSKf7kfCbOxEcQAD74a5e2LC2+rYiQ=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/harlow/kinesis-consumer v0.3.6-0.20240916192723-43900507c911 h1:eLNkr0OcBl7pzM6DCLSgVp3VQyS5ZrLnanXPqH5EmE0=
github.com/harlow/kinesis-consumer v0.3.6-0.20240916192723-43900507c911/go.mod h1:jTE9kH7IVx841D0GgxjykKieSP1yDSckuEg5ceSCjEU=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.29.2 h1:aYyRn8EdE2mSfG14S1+L9Qkjtz8RzmaWh6AcNGRNwPw=
github.com/hashicorp/consul/api v1.29.2/go.mod h1:0YObcaLNDSbtlgzIRtmRXI1ZkeuK0trCBxwZQ4MYnIk=
@ -1722,7 +1679,6 @@ github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/rasw
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw=
@ -1808,7 +1764,6 @@ github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b/go.mod h1:
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linkedin/goavro/v2 v2.13.0 h1:L8eI8GcuciwUkt41Ej62joSZS4kKaYIUdze+6for9NU=
@ -1837,7 +1792,6 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
@ -1891,7 +1845,6 @@ github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos
github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ=
github.com/mdlayher/vsock v1.2.1 h1:pC1mTJTvjo1r9n9fbm7S1j04rCgCzhCOS5DY0zqHlnQ=
github.com/mdlayher/vsock v1.2.1/go.mod h1:NRfCibel++DgeMD8z/hP+PPTjlNJsdPOmxcnENvE+SE=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/mholt/archiver/v3 v3.5.0/go.mod h1:qqTTPUK/HZPFgFQ/TJ3BzvTpF/dPtFVJXdQbCmeMxwc=
github.com/microsoft/ApplicationInsights-Go v0.4.4 h1:G4+H9WNs6ygSCe6sUyxRc2U81TI5Es90b2t/MwX5KqY=
github.com/microsoft/ApplicationInsights-Go v0.4.4/go.mod h1:fKRUseBqkw6bDiXTs3ESTiU/4YTIHsQS4W3fP2ieF4U=
@ -2012,12 +1965,6 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
github.com/onsi/ginkgo/v2 v2.1.6/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk=
github.com/onsi/ginkgo/v2 v2.3.0/go.mod h1:Eew0uilEqZmIEZr8JrvYlvOM7Rr6xzTmMV8AyFNU9d0=
github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo=
github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw=
github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM=
github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
@ -2025,13 +1972,6 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo=
github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc=
github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ1tuM=
github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM=
github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4=
github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.101.0 h1:TCQYvGS2MKTotOTQDnHUSd4ljEzXRzHXopdv71giKWU=
@ -2198,7 +2138,6 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
github.com/robinson/gos7 v0.0.0-20240315073918-1f14519e4846 h1:CnAbtX0j07ZVR/TnD5V6ypFTrASJlfr+fc4OY2da9eg=
github.com/robinson/gos7 v0.0.0-20240315073918-1f14519e4846/go.mod h1:AMHIeh1KJ7Xa2RVOMHdv9jXKrpw0D4EWGGQMHLb2doc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
@ -2235,7 +2174,6 @@ github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/sensu/sensu-go/api/core/v2 v2.16.0 h1:HOq4rFkQ1S5ZjxmMTLc5J5mAbECrnKWvtXXbMqr3j9s=
github.com/sensu/sensu-go/api/core/v2 v2.16.0/go.mod h1:MjM7+MCGEyTAgaZ589SiGHwYiaYF7N/58dU0J070u/0=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
github.com/shirou/gopsutil/v4 v4.24.12 h1:qvePBOk20e0IKA1QXrIIU+jmk+zEiYVVx06WjBRlZo4=
@ -2275,14 +2213,12 @@ github.com/sleepinggenius2/gosmi v0.4.4/go.mod h1:l8OniPmd3bJzw0MXP2/qh7AhP/e+bT
github.com/smallstep/assert v0.0.0-20200723003110-82e2b9b3b262 h1:unQFBIznI+VYD1/1fApl1A+9VcBk+9dcqGfnePY87LY=
github.com/smallstep/assert v0.0.0-20200723003110-82e2b9b3b262/go.mod h1:MyOHs9Po2fbM1LHej6sBUT8ozbxmMOFG+E+rx/GSGuc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs=
github.com/smartystreets/gunit v1.1.3/go.mod h1:EH5qMBab2UclzXUcpR8b93eHsIlp9u+pDQIRp5DZNzQ=
github.com/snowflakedb/gosnowflake v1.11.2 h1:eAMsxrCiC6ij5wX3dHx1TQCBOdDmCK062Ir8rndUkRg=
github.com/snowflakedb/gosnowflake v1.11.2/go.mod h1:WFe+8mpsapDaQjHX6BqJBKtfQCGlGD3lHKeDsKfpx2A=
@ -2370,13 +2306,8 @@ github.com/tidwall/wal v1.1.8 h1:2qDSGdAdjaY3PEvHRva+9UFqgk+ef7cOiW1Qn5JH1y0=
github.com/tidwall/wal v1.1.8/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E=
github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY=
github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro=
github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
github.com/tj/go-buffer v1.0.1/go.mod h1:iyiJpfFcR2B9sXu7KvjbT9fpM4mOelRSDTbntVj52Uc=
github.com/tj/go-elastic v0.0.0-20171221160941-36157cbbebc2/go.mod h1:WjeM0Oo1eNAjXGDx2yma7uG2XoyRZTq1uv3M/o7imD0=
github.com/tj/go-kinesis v0.0.0-20171128231115-08b17f58cb1b/go.mod h1:/yhzCV0xPfx6jb1bBgRFjl5lytqVqZXEaeqWP8lTEao=
github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
@ -2448,7 +2379,6 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/goldmark v1.7.8 h1:iERMLn0/QJeHFhxSt3p6PeN9mGnvIKSpG9YYorDMnic=
github.com/yuin/goldmark v1.7.8/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E=
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg=
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yunify/qingstor-sdk-go/v3 v3.2.0 h1:9sB2WZMgjwSUNZhrgvaNGazVltoFUUfuS9f0uCWtTr8=
@ -2558,7 +2488,6 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -2646,7 +2575,6 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
@ -2908,11 +2836,9 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -3070,7 +2996,6 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
@ -3427,7 +3352,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@ -1,7 +1,10 @@
# Kinesis Consumer Input Plugin
The [Kinesis][kinesis] consumer plugin reads from a Kinesis data stream
and creates metrics using one of the supported [input data formats][].
This plugin consumes records from [AWS Kinesis][kinesis] data stream and
creates metrics using one of the supported [data formats][data_formats].
[kinesis]: https://aws.amazon.com/kinesis/
[data_formats]: /docs/DATA_FORMATS_INPUT.md
## Service Input <!-- @/docs/includes/service_input.md -->
@ -58,9 +61,19 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
## Shard iterator type
## Available options: 'TRIM_HORIZON' (first in non-expired) and 'LATEST'
# shard_iterator_type = "TRIM_HORIZON"
## Interval for checking for new records
## Please consider limits for getting records documented here:
## https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
# poll_interval = "250ms"
## Interval for scanning for new shards created when resharding
## If set to zero, shards are only scanned once on startup.
# shard_update_interval = "30s"
## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
@ -73,27 +86,23 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
##
## The content encoding of the data from kinesis
## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
## Content encoding of the record data
## If you are processing a cloudwatch logs kinesis stream then set this to
## "gzip" as AWS compresses cloudwatch log data before it is sent to kinesis.
# content_encoding = "identity"
## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
## unique name for this consumer
app_name = "default"
table_name = "default"
## Data format of the records to consume
## See https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
## Optional: Configuration for DynamoDB backend to store positions in the stream
# [inputs.kinesis_consumer.checkpoint_dynamodb]
# ## Unique name for this consumer
# app_name = "default"
# ## Table to store the sequence numbers in
# table_name = "default"
# ## Interval for persisting data to limit write operations
# # interval = "10s"
```
### Required AWS IAM permissions
@ -119,9 +128,6 @@ Partition key: namespace
Sort key: shard_id
```
[kinesis]: https://aws.amazon.com/kinesis/
[input data formats]: /docs/DATA_FORMATS_INPUT.md
## Metrics
## Example Output

View File

@ -0,0 +1,355 @@
package kinesis_consumer
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/influxdata/telegraf"
)
type recordHandler func(ctx context.Context, shard string, r *types.Record)
type shardConsumer struct {
seqnr string
interval time.Duration
log telegraf.Logger
client *kinesis.Client
params *kinesis.GetShardIteratorInput
onMessage recordHandler
}
func (c *shardConsumer) consume(ctx context.Context, shard string) ([]types.ChildShard, error) {
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
// Get the first shard iterator
iter, err := c.iterator(ctx)
if err != nil {
return nil, fmt.Errorf("getting first shard iterator failed: %w", err)
}
for {
// Get new records from the shard
resp, err := c.client.GetRecords(ctx, &kinesis.GetRecordsInput{
ShardIterator: iter,
})
if err != nil {
// Handle recoverable errors
var throughputErr *types.ProvisionedThroughputExceededException
var expiredIterErr *types.ExpiredIteratorException
switch {
case errors.As(err, &throughputErr):
// Wait a second before trying again as suggested by
// https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
c.log.Tracef("throughput exceeded when getting records for shard %s...", shard)
time.Sleep(time.Second)
continue
case errors.As(err, &expiredIterErr):
c.log.Tracef("iterator expired for shard %s...", shard)
if iter, err = c.iterator(ctx); err != nil {
return nil, fmt.Errorf("getting shard iterator failed: %w", err)
}
continue
case errors.Is(err, context.Canceled):
return nil, nil
default:
c.log.Tracef("get-records error is of type %T", err)
return nil, fmt.Errorf("getting records failed: %w", err)
}
}
c.log.Tracef("read %d records for shard %s...", len(resp.Records), shard)
// Check if we fully read the shard
if resp.NextShardIterator == nil {
return resp.ChildShards, nil
}
iter = resp.NextShardIterator
// Process the records and keep track of the last sequence number
// consumed for recreating the iterator.
for _, r := range resp.Records {
c.onMessage(ctx, shard, &r)
c.seqnr = *r.SequenceNumber
if errors.Is(ctx.Err(), context.Canceled) {
return nil, nil
}
}
// Wait for the poll interval to pass or cancel
select {
case <-ctx.Done():
return nil, nil
case <-ticker.C:
continue
}
}
}
func (c *shardConsumer) iterator(ctx context.Context) (*string, error) {
for {
resp, err := c.client.GetShardIterator(ctx, c.params)
if err != nil {
var throughputErr *types.ProvisionedThroughputExceededException
if errors.As(err, &throughputErr) {
// We called the function too often and should wait a bit
// until trying again
c.log.Tracef("throughput exceeded when getting iterator for shard %s...", *c.params.ShardId)
time.Sleep(time.Second)
continue
}
return nil, err
}
c.log.Tracef("successfully updated iterator for shard %s (%s)...", *c.params.ShardId, c.seqnr)
return resp.ShardIterator, nil
}
}
type consumer struct {
config aws.Config
stream string
iterType types.ShardIteratorType
pollInterval time.Duration
shardUpdateInterval time.Duration
log telegraf.Logger
onMessage recordHandler
position func(shard string) string
client *kinesis.Client
shardsConsumed map[string]bool
shardConsumers map[string]*shardConsumer
wg sync.WaitGroup
sync.Mutex
}
func (c *consumer) init() error {
if c.stream == "" {
return errors.New("stream cannot be empty")
}
if c.pollInterval <= 0 {
return errors.New("invalid poll interval")
}
if c.onMessage == nil {
return errors.New("message handler is undefined")
}
c.shardsConsumed = make(map[string]bool)
c.shardConsumers = make(map[string]*shardConsumer)
return nil
}
func (c *consumer) start(ctx context.Context) {
// Setup the client
c.client = kinesis.NewFromConfig(c.config)
// Do the initial discovery of shards
if err := c.updateShardConsumers(ctx); err != nil {
c.log.Errorf("Initializing shards failed: %v", err)
}
// If the consumer has a shard-update interval, use a ticker to update
// available shards on a regular basis
if c.shardUpdateInterval <= 0 {
return
}
ticker := time.NewTicker(c.shardUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := c.updateShardConsumers(ctx); err != nil {
c.log.Errorf("Updating shards failed: %v", err)
}
}
}
}
func (c *consumer) updateShardConsumers(ctx context.Context) error {
// List all shards of the given stream
var availableShards []types.Shard
req := &kinesis.ListShardsInput{StreamName: aws.String(c.stream)}
for {
resp, err := c.client.ListShards(ctx, req)
if err != nil {
return fmt.Errorf("listing shards failed: %w", err)
}
availableShards = append(availableShards, resp.Shards...)
if resp.NextToken == nil {
break
}
req = &kinesis.ListShardsInput{NextToken: resp.NextToken}
}
c.log.Tracef("got %d shards during update", len(availableShards))
// All following operations need to be locked to create a consistent
// state of the shards and consumers
c.Lock()
defer c.Unlock()
// Filter out all shards actively consumed already
inactiveShards := make([]types.Shard, 0, len(availableShards))
for _, shard := range availableShards {
id := *shard.ShardId
if _, found := c.shardConsumers[id]; found {
c.log.Tracef("shard %s is actively consumed...", id)
continue
}
c.log.Tracef("shard %s is not actively consumed...", id)
inactiveShards = append(inactiveShards, shard)
}
// Fill the shards already consumed and get the positions if the consumer
// is backed by an iterator store
newShards := make([]types.Shard, 0, len(inactiveShards))
seqnrs := make(map[string]string, len(inactiveShards))
for _, shard := range inactiveShards {
id := *shard.ShardId
if c.shardsConsumed[id] {
c.log.Tracef("shard %s is already fully consumed...", id)
continue
}
c.log.Tracef("shard %s is not fully consumed...", id)
// Retrieve the shard position from the store
if c.position != nil {
seqnr := c.position(id)
if seqnr == "" {
// A truely new shard
newShards = append(newShards, shard)
c.log.Tracef("shard %s is new...", id)
continue
}
seqnrs[id] = seqnr
// Check if we already fully consumed for closed shards
end := shard.SequenceNumberRange.EndingSequenceNumber
if end != nil && *end == seqnr {
c.log.Tracef("shard %s is closed and already fully consumed...", id)
c.shardsConsumed[id] = true
continue
}
c.log.Tracef("shard %s is not yet fully consumed...", id)
}
// The shard is not fully consumed yet so save the sequence number
// and the shard as "new".
newShards = append(newShards, shard)
}
// Filter all shards already fully consumed and create a new consumer for
// every remaining new shard respecting resharding artifacts
for _, shard := range newShards {
id := *shard.ShardId
// Handle resharding by making sure all parents are consumed already
// before starting a consumer on a child shard. If parents are not
// consumed fully we ignore this shard here as it will be reported
// by the call to `GetRecords` as a child later.
if shard.ParentShardId != nil && *shard.ParentShardId != "" {
pid := *shard.ParentShardId
if !c.shardsConsumed[pid] {
c.log.Tracef("shard %s has parent %s which is not fully consumed yet...", id, pid)
continue
}
}
if shard.AdjacentParentShardId != nil && *shard.AdjacentParentShardId != "" {
pid := *shard.AdjacentParentShardId
if !c.shardsConsumed[pid] {
c.log.Tracef("shard %s has adjacent parent %s which is not fully consumed yet...", id, pid)
continue
}
}
// Create a new consumer and start it
c.wg.Add(1)
go func(shardID string) {
defer c.wg.Done()
c.startShardConsumer(ctx, shardID, seqnrs[shardID])
}(id)
}
return nil
}
func (c *consumer) startShardConsumer(ctx context.Context, id, seqnr string) {
c.log.Tracef("starting consumer for shard %s at sequence number %q...", id, seqnr)
sc := &shardConsumer{
seqnr: seqnr,
interval: c.pollInterval,
log: c.log,
onMessage: c.onMessage,
client: c.client,
params: &kinesis.GetShardIteratorInput{
ShardId: &id,
ShardIteratorType: c.iterType,
StreamName: &c.stream,
},
}
if seqnr != "" {
sc.params.ShardIteratorType = types.ShardIteratorTypeAfterSequenceNumber
sc.params.StartingSequenceNumber = &seqnr
}
c.shardConsumers[id] = sc
childs, err := sc.consume(ctx, id)
if err != nil {
c.log.Errorf("Consuming shard %s failed: %v", id, err)
return
}
c.log.Tracef("finished consuming shard %s", id)
c.Lock()
defer c.Unlock()
c.shardsConsumed[id] = true
delete(c.shardConsumers, id)
for _, shard := range childs {
cid := *shard.ShardId
startable := true
for _, pid := range shard.ParentShards {
startable = startable && c.shardsConsumed[pid]
}
if !startable {
c.log.Tracef("child shard %s of shard %s is not startable as parents are fully consumed yet...", cid, id)
continue
}
c.log.Tracef("child shard %s of shard %s is startable...", cid, id)
var cseqnr string
if c.position != nil {
cseqnr = c.position(cid)
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.startShardConsumer(ctx, cid, cseqnr)
}()
}
}
func (c *consumer) stop() {
c.wg.Wait()
}

View File

@ -5,16 +5,15 @@ import (
"context"
_ "embed"
"errors"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
consumer "github.com/harlow/kinesis-consumer"
"github.com/harlow/kinesis-consumer/store/ddb"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
common_aws "github.com/influxdata/telegraf/plugins/common/aws"
"github.com/influxdata/telegraf/plugins/inputs"
@ -28,44 +27,46 @@ var once sync.Once
type KinesisConsumer struct {
StreamName string `toml:"streamname"`
ShardIteratorType string `toml:"shard_iterator_type"`
PollInterval config.Duration `toml:"poll_interval"`
ShardUpdateInterval config.Duration `toml:"shard_update_interval"`
DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
ContentEncoding string `toml:"content_encoding"`
Log telegraf.Logger `toml:"-"`
common_aws.CredentialConfig
cons *consumer.Consumer
parser telegraf.Parser
cancel context.CancelFunc
acc telegraf.TrackingAccumulator
sem chan struct{}
parser telegraf.Parser
checkpoint consumer.Store
checkpoints map[string]checkpoint
records map[telegraf.TrackingID]string
checkpointTex sync.Mutex
recordsTex sync.Mutex
wg sync.WaitGroup
cfg aws.Config
consumer *consumer
cancel context.CancelFunc
sem chan struct{}
iteratorStore *store
records map[telegraf.TrackingID]iterator
recordsTex sync.Mutex
wg sync.WaitGroup
contentDecodingFunc decodingFunc
lastSeqNum string
}
type dynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
}
type checkpoint struct {
streamName string
shardID string
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
Interval config.Duration `toml:"interval"`
}
func (*KinesisConsumer) SampleConfig() string {
return sampleConfig
}
func (k *KinesisConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
}
func (k *KinesisConsumer) Init() error {
// Set defaults
if k.MaxUndeliveredMessages < 1 {
@ -79,140 +80,132 @@ func (k *KinesisConsumer) Init() error {
k.ContentEncoding = "identity"
}
// Check input params
if k.StreamName == "" {
return errors.New("stream name cannot be empty")
}
f, err := getDecodingFunc(k.ContentEncoding)
if err != nil {
return err
}
k.contentDecodingFunc = f
return nil
}
func (k *KinesisConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
}
func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error {
return k.connect(acc)
}
func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error {
if k.cons == nil {
return k.connect(acc)
}
// Enforce writing of last received sequence number
k.lastSeqNum = ""
return nil
}
func (k *KinesisConsumer) Stop() {
k.cancel()
k.wg.Wait()
}
// GetCheckpoint wraps the checkpoint's GetCheckpoint function (called by consumer library)
func (k *KinesisConsumer) GetCheckpoint(streamName, shardID string) (string, error) {
return k.checkpoint.GetCheckpoint(streamName, shardID)
}
// SetCheckpoint wraps the checkpoint's SetCheckpoint function (called by consumer library)
func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
if sequenceNumber == "" {
return errors.New("sequence number should not be empty")
if k.DynamoDB != nil {
if k.DynamoDB.Interval <= 0 {
k.DynamoDB.Interval = config.Duration(10 * time.Second)
}
k.iteratorStore = newStore(k.DynamoDB.AppName, k.DynamoDB.TableName, time.Duration(k.DynamoDB.Interval), k.Log)
}
k.checkpointTex.Lock()
k.checkpoints[sequenceNumber] = checkpoint{streamName: streamName, shardID: shardID}
k.checkpointTex.Unlock()
k.records = make(map[telegraf.TrackingID]iterator, k.MaxUndeliveredMessages)
k.sem = make(chan struct{}, k.MaxUndeliveredMessages)
return nil
}
func (k *KinesisConsumer) connect(acc telegraf.Accumulator) error {
// Setup the client to connect to the Kinesis service
cfg, err := k.CredentialConfig.Credentials()
if err != nil {
return err
}
if k.EndpointURL != "" {
cfg.BaseEndpoint = &k.EndpointURL
}
if k.Log.Level().Includes(telegraf.Trace) {
logWrapper := &telegrafLoggerWrapper{k.Log}
cfg.Logger = logWrapper
cfg.ClientLogMode = aws.LogRetries
}
k.cfg = cfg
logWrapper := &telegrafLoggerWrapper{k.Log}
cfg.Logger = logWrapper
cfg.ClientLogMode = aws.LogRetries
client := kinesis.NewFromConfig(cfg)
return nil
}
k.checkpoint = &noopStore{}
if k.DynamoDB != nil {
var err error
k.checkpoint, err = ddb.New(
k.DynamoDB.AppName,
k.DynamoDB.TableName,
ddb.WithDynamoClient(dynamodb.NewFromConfig(cfg)),
ddb.WithMaxInterval(time.Second*10),
)
if err != nil {
return err
func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error {
k.acc = acc.WithTracking(k.MaxUndeliveredMessages)
// Start the store if necessary
if k.iteratorStore != nil {
if err := k.iteratorStore.run(context.Background()); err != nil {
return fmt.Errorf("starting DynamoDB store failed: %w", err)
}
}
cons, err := consumer.New(
k.StreamName,
consumer.WithClient(client),
consumer.WithShardIteratorType(k.ShardIteratorType),
consumer.WithStore(k),
consumer.WithLogger(logWrapper),
)
if err != nil {
return err
}
k.cons = cons
k.acc = acc.WithTracking(k.MaxUndeliveredMessages)
k.records = make(map[telegraf.TrackingID]string, k.MaxUndeliveredMessages)
k.checkpoints = make(map[string]checkpoint, k.MaxUndeliveredMessages)
k.sem = make(chan struct{}, k.MaxUndeliveredMessages)
ctx := context.Background()
ctx, k.cancel = context.WithCancel(ctx)
// Setup the consumer
k.consumer = &consumer{
config: k.cfg,
stream: k.StreamName,
iterType: types.ShardIteratorType(k.ShardIteratorType),
pollInterval: time.Duration(k.PollInterval),
shardUpdateInterval: time.Duration(k.ShardUpdateInterval),
log: k.Log,
onMessage: func(ctx context.Context, shard string, r *types.Record) {
// Checking for number of messages in flight and wait for a free
// slot in case there are too many
select {
case <-ctx.Done():
return
case k.sem <- struct{}{}:
break
}
if err := k.onMessage(k.acc, shard, r); err != nil {
seqnr := *r.SequenceNumber
k.Log.Errorf("Processing message with sequence number %q in shard %s failed: %v", seqnr, shard, err)
<-k.sem
}
},
}
// Link in the backing iterator store
if k.iteratorStore != nil {
k.consumer.position = func(shard string) string {
seqnr, err := k.iteratorStore.get(ctx, k.StreamName, shard)
if err != nil && !errors.Is(err, errNotFound) {
k.Log.Errorf("retrieving sequence number for shard %q failed: %s", shard, err)
}
return seqnr
}
}
if err := k.consumer.init(); err != nil {
return fmt.Errorf("initializing consumer failed: %w", err)
}
// Start the go-routine handling metrics delivered to the output
k.wg.Add(1)
go func() {
defer k.wg.Done()
k.onDelivery(ctx)
}()
// Start the go-routine handling message consumption
k.wg.Add(1)
go func() {
defer k.wg.Done()
err := k.cons.Scan(ctx, func(r *consumer.Record) error {
select {
case <-ctx.Done():
return ctx.Err()
case k.sem <- struct{}{}:
break
}
if err := k.onMessage(k.acc, r); err != nil {
<-k.sem
k.Log.Errorf("Scan parser error: %v", err)
}
return nil
})
if err != nil {
k.cancel()
k.Log.Errorf("Scan encountered an error: %v", err)
k.cons = nil
}
k.consumer.start(ctx)
}()
return nil
}
func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
func (*KinesisConsumer) Gather(telegraf.Accumulator) error {
return nil
}
func (k *KinesisConsumer) Stop() {
k.cancel()
k.wg.Wait()
k.consumer.stop()
if k.iteratorStore != nil {
k.iteratorStore.stop()
}
}
// onMessage is called for new messages consumed from Kinesis
func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, shard string, r *types.Record) error {
data, err := k.contentDecodingFunc(r.Data)
if err != nil {
return err
@ -228,61 +221,58 @@ func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consume
})
}
seqnr := *r.SequenceNumber
k.recordsTex.Lock()
defer k.recordsTex.Unlock()
id := acc.AddTrackingMetricGroup(metrics)
k.records[id] = *r.SequenceNumber
k.recordsTex.Unlock()
k.records[id] = iterator{shard: shard, seqnr: seqnr}
return nil
}
// onDelivery is called for every metric successfully delivered to the outputs
func (k *KinesisConsumer) onDelivery(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case info := <-k.acc.Delivered():
k.recordsTex.Lock()
sequenceNum, ok := k.records[info.ID()]
if !ok {
k.recordsTex.Unlock()
continue
// Store the metric iterator in DynamoDB if configured
if k.iteratorStore != nil {
k.storeDelivered(info.ID())
}
// Reduce the number of undelivered messages by reading from the channel
<-k.sem
delete(k.records, info.ID())
k.recordsTex.Unlock()
if !info.Delivered() {
k.Log.Debug("Metric group failed to process")
continue
}
if k.lastSeqNum != "" {
continue
}
// Store the sequence number at least once per gather cycle using the checkpoint
// storage (usually DynamoDB).
k.checkpointTex.Lock()
chk, ok := k.checkpoints[sequenceNum]
if !ok {
k.checkpointTex.Unlock()
continue
}
delete(k.checkpoints, sequenceNum)
k.checkpointTex.Unlock()
k.Log.Tracef("persisting sequence number %q for stream %q and shard %q", sequenceNum)
k.lastSeqNum = sequenceNum
if err := k.checkpoint.SetCheckpoint(chk.streamName, chk.shardID, sequenceNum); err != nil {
k.Log.Errorf("Setting checkpoint failed: %v", err)
}
}
}
}
func (k *KinesisConsumer) storeDelivered(id telegraf.TrackingID) {
k.recordsTex.Lock()
defer k.recordsTex.Unlock()
// Find the iterator belonging to the delivered message
iter, ok := k.records[id]
if !ok {
k.Log.Debugf("No iterator found for delivered metric %v!", id)
return
}
// Remove metric
delete(k.records, id)
// Store the iterator in the database
k.iteratorStore.set(k.StreamName, iter.shard, iter.seqnr)
}
func init() {
inputs.Add("kinesis_consumer", func() telegraf.Input {
return &KinesisConsumer{}
return &KinesisConsumer{
PollInterval: config.Duration(250 * time.Millisecond),
ShardUpdateInterval: config.Duration(30 * time.Second),
}
})
}

View File

@ -6,7 +6,6 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
consumer "github.com/harlow/kinesis-consumer"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
@ -16,6 +15,7 @@ import (
func TestInvalidCoding(t *testing.T) {
plugin := &KinesisConsumer{
StreamName: "foo",
ContentEncoding: "notsupported",
}
require.ErrorContains(t, plugin.Init(), "unknown content encoding")
@ -62,32 +62,25 @@ func TestOnMessage(t *testing.T) {
tests := []struct {
name string
encoding string
records map[telegraf.TrackingID]string
args *consumer.Record
record *types.Record
expectedNumber int
expectedContent string
}{
{
name: "test no compression",
encoding: "none",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
record: &types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
expectedNumber: 2,
expectedContent: "bob",
},
{
name: "test no compression via empty string for ContentEncoding",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
name: "test no compression via empty string for ContentEncoding",
record: &types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
expectedNumber: 2,
expectedContent: "bob",
@ -95,24 +88,18 @@ func TestOnMessage(t *testing.T) {
{
name: "test no compression via identity ContentEncoding",
encoding: "identity",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
record: &types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
expectedNumber: 2,
expectedContent: "bob",
},
{
name: "test no compression via no ContentEncoding",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
name: "test no compression via no ContentEncoding",
record: &types.Record{
Data: notZippedBytes,
SequenceNumber: aws.String("anything"),
},
expectedNumber: 2,
expectedContent: "bob",
@ -120,12 +107,9 @@ func TestOnMessage(t *testing.T) {
{
name: "test gzip compression",
encoding: "gzip",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: gzippedBytes,
SequenceNumber: aws.String("anything"),
},
record: &types.Record{
Data: gzippedBytes,
SequenceNumber: aws.String("anything"),
},
expectedNumber: 1,
expectedContent: "bob",
@ -133,12 +117,9 @@ func TestOnMessage(t *testing.T) {
{
name: "test zlib compression",
encoding: "zlib",
records: make(map[telegraf.TrackingID]string),
args: &consumer.Record{
Record: types.Record{
Data: zlibBytpes,
SequenceNumber: aws.String("anything"),
},
record: &types.Record{
Data: zlibBytpes,
SequenceNumber: aws.String("anything"),
},
expectedNumber: 1,
expectedContent: "bob",
@ -157,14 +138,16 @@ func TestOnMessage(t *testing.T) {
// Setup plugin
plugin := &KinesisConsumer{
StreamName: "foo",
ContentEncoding: tt.encoding,
Log: &testutil.Logger{},
parser: parser,
records: tt.records,
records: make(map[telegraf.TrackingID]iterator),
}
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
require.NoError(t, plugin.onMessage(acc.WithTracking(tt.expectedNumber), tt.args))
require.NoError(t, plugin.onMessage(acc.WithTracking(tt.expectedNumber), "test", tt.record))
actual := acc.GetTelegrafMetrics()
require.Len(t, actual, tt.expectedNumber)

View File

@ -1,7 +0,0 @@
package kinesis_consumer
// noopStore implements the storage interface with discard
type noopStore struct{}
func (noopStore) SetCheckpoint(_, _, _ string) error { return nil }
func (noopStore) GetCheckpoint(_, _ string) (string, error) { return "", nil }

View File

@ -30,9 +30,19 @@
## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
## Shard iterator type
## Available options: 'TRIM_HORIZON' (first in non-expired) and 'LATEST'
# shard_iterator_type = "TRIM_HORIZON"
## Interval for checking for new records
## Please consider limits for getting records documented here:
## https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
# poll_interval = "250ms"
## Interval for scanning for new shards created when resharding
## If set to zero, shards are only scanned once on startup.
# shard_update_interval = "30s"
## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
@ -45,24 +55,20 @@
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
##
## The content encoding of the data from kinesis
## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
## Content encoding of the record data
## If you are processing a cloudwatch logs kinesis stream then set this to
## "gzip" as AWS compresses cloudwatch log data before it is sent to kinesis.
# content_encoding = "identity"
## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
## unique name for this consumer
app_name = "default"
table_name = "default"
## Data format of the records to consume
## See https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
## Optional: Configuration for DynamoDB backend to store positions in the stream
# [inputs.kinesis_consumer.checkpoint_dynamodb]
# ## Unique name for this consumer
# app_name = "default"
# ## Table to store the sequence numbers in
# table_name = "default"
# ## Interval for persisting data to limit write operations
# # interval = "10s"

View File

@ -0,0 +1,179 @@
package kinesis_consumer
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/influxdata/telegraf"
)
var errNotFound = errors.New("no iterator found")
type iterator struct {
stream string
shard string
seqnr string
modified bool
}
type store struct {
app string
table string
interval time.Duration
log telegraf.Logger
client *dynamodb.Client
iterators map[string]iterator
wg sync.WaitGroup
cancel context.CancelFunc
sync.Mutex
}
func newStore(app, table string, interval time.Duration, log telegraf.Logger) *store {
s := &store{
app: app,
table: table,
interval: interval,
log: log,
}
// Initialize the iterator states
s.iterators = make(map[string]iterator)
return s
}
func (s *store) run(ctx context.Context) error {
rctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
// Create a client to connect to DynamoDB
cfg, err := config.LoadDefaultConfig(rctx)
if err != nil {
return fmt.Errorf("loading default config failed: %w", err)
}
s.client = dynamodb.NewFromConfig(cfg)
// Start the go-routine that pushes the states out to DynamoDB on a
// regular interval
s.wg.Add(1)
go func() {
defer s.wg.Done()
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-rctx.Done():
return
case <-ticker.C:
s.write(rctx)
}
}
}()
return nil
}
func (s *store) stop() {
ctx, cancel := context.WithTimeout(context.Background(), s.interval)
defer cancel()
s.write(ctx)
s.cancel()
s.wg.Wait()
}
func (s *store) write(ctx context.Context) {
s.Lock()
defer s.Unlock()
for k, iter := range s.iterators {
// Only write iterators modified since the last write
if !iter.modified {
continue
}
if _, err := s.client.PutItem(
ctx,
&dynamodb.PutItemInput{
TableName: aws.String(s.table),
Item: map[string]types.AttributeValue{
"namespace": &types.AttributeValueMemberS{Value: s.app + "-" + iter.stream},
"shard_id": &types.AttributeValueMemberS{Value: iter.shard},
"sequence_number": &types.AttributeValueMemberS{Value: iter.seqnr},
},
}); err != nil {
s.log.Errorf("storing iterator %s-%s/%s/%s failed: %v", s.app, iter.stream, iter.shard, iter.seqnr, err)
}
// Mark state as saved
iter.modified = false
s.iterators[k] = iter
}
}
func (s *store) set(stream, shard, seqnr string) {
s.Lock()
defer s.Unlock()
s.iterators[stream+"/"+shard] = iterator{
stream: stream,
shard: shard,
seqnr: seqnr,
modified: true,
}
}
func (s *store) get(ctx context.Context, stream, shard string) (string, error) {
s.Lock()
defer s.Unlock()
// Return the cached result if possible
if iter, found := s.iterators[stream+"/"+shard]; found {
return iter.seqnr, nil
}
// Retrieve the information from the database
resp, err := s.client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(s.table),
ConsistentRead: aws.Bool(true),
Key: map[string]types.AttributeValue{
"namespace": &types.AttributeValueMemberS{Value: s.app + "-" + stream},
"shard_id": &types.AttributeValueMemberS{Value: shard},
},
})
if err != nil {
return "", err
}
// Extract the sequence number
raw, found := resp.Item["sequence_number"]
if !found {
return "", fmt.Errorf("%w for %s-%s/%s", errNotFound, s.app, stream, shard)
}
seqnr, ok := raw.(*types.AttributeValueMemberS)
if !ok {
return "", fmt.Errorf("sequence number for %s-%s/%s is of unexpected type %T", s.app, stream, shard, raw)
}
// Fill the cache
s.iterators[stream+"/"+shard] = iterator{
stream: stream,
shard: shard,
seqnr: seqnr.Value,
}
return seqnr.Value, nil
}