chore(outputs.event_hubs): Switch to maintained library (#16478)

This commit is contained in:
Sven Rebhan 2025-02-24 16:10:36 +01:00 committed by GitHub
parent c4fe39376c
commit 1eeabf9c97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 438 additions and 246 deletions

View File

@ -166,7 +166,11 @@ jobs:
- check-changed-files-or-halt
- run: 'sh ./scripts/installgo_linux.sh'
- run: 'make deps'
- run: 'make test-integration'
- run:
name: "Run integration tests"
command: make test-integration
environment:
AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA: yes
test-go-mac:
executor: mac
steps:

View File

@ -18,6 +18,7 @@ following works:
- github.com/Azure/azure-sdk-for-go/sdk/azcore [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/azcore/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/azidentity [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/azidentity/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/internal [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/internal/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/monitor/armmonitor/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/resources/armresources/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/storage/azblob [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/storage/azblob/LICENSE.txt)

8
go.mod
View File

@ -15,6 +15,7 @@ require (
github.com/Azure/azure-kusto-go v0.16.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0
github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe
@ -191,7 +192,8 @@ require (
github.com/srebhan/protobufquery v1.0.1
github.com/stretchr/testify v1.10.0
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62
github.com/testcontainers/testcontainers-go v0.34.0
github.com/testcontainers/testcontainers-go v0.35.0
github.com/testcontainers/testcontainers-go/modules/azurite v0.35.0
github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0
github.com/thomasklein94/packer-plugin-libvirt v0.5.0
github.com/tidwall/gjson v1.18.0
@ -246,7 +248,7 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect
cloud.google.com/go/compute/metadata v0.6.0 // indirect
cloud.google.com/go/iam v1.2.2 // indirect
code.cloudfoundry.org/clock v1.0.0 // indirect
code.cloudfoundry.org/clock v1.2.0 // indirect
dario.cat/mergo v1.0.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
@ -255,7 +257,7 @@ require (
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 // indirect
github.com/Azure/go-amqp v1.0.0 // indirect
github.com/Azure/go-amqp v1.0.5 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.6 // indirect

29
go.sum
View File

@ -624,8 +624,8 @@ cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vf
cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA=
cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw=
code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o=
code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
code.cloudfoundry.org/clock v1.2.0 h1:1swXS7yPmQmhAdkTb1nJ2c0geOdf4LvibUleNCo2HjA=
code.cloudfoundry.org/clock v1.2.0/go.mod h1:foDbmVp5RIuIGlota90ot4FkJtx5m4+oKoWiVuu2FDg=
collectd.org v0.6.0 h1:wDTcB13Zork7m9bEHmU2sVL4z+hxBmm8EyeMjjxtW7s=
collectd.org v0.6.0/go.mod h1:fXcRZb1qBKshIHJa2T8qBS7Xew/I43iMutefnTdGeYo=
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
@ -658,8 +658,14 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1 h1:1mvYtZfWQAnwNah/C+Z+J
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1/go.mod h1:75I/mXtme1JyWFtz8GocPHVFyH421IBoZErnO16dd0k=
github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.1 h1:Bk5uOhSAenHyR5P61D/NzeQCv+4fEVV8mOkJ82NqpWw=
github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.1/go.mod h1:QZ4pw3or1WPmRBxf0cHd1tknzrT54WPBOQoGutCPvSU=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0 h1:aJG+Jxd9/rrLwf8R1Ko0RlOBTJASs/lGQJ8b9AdlKTc=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0/go.mod h1:41ONblJrPxDcnVr+voS+3xXWy/KnZLh+7zY5s6woAlQ=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 h1:6bVZts/82H+hax9b3vdmSpi7+Hw9uWvEaJHeKlafnW4=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3/go.mod h1:qf3s/6aV9ePKYGeEYPsbndK6GGfeS7SrbA6OE/T7NIA=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 h1:+dggnR89/BIIlRlQ6d19dkhhdd/mQUiQbXhyHUFiB4w=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0/go.mod h1:tI9M2Q/ueFi287QRkdrhb9LHm6ZnXgkVYLRC3FhYkPw=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0 h1:PTFGRSlMKCQelWwxUyYVEUqseBJVemLyqWJjvMyt0do=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0/go.mod h1:LRr2FzBTQlONPPa5HREE5+RjSCTXl7BwOvYOaWTqCaI=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/managementgroups/armmanagementgroups v1.0.0 h1:pPvTJ1dY0sA35JOeFq6TsY2xj6Z85Yo23Pj4wCCvu4o=
@ -678,10 +684,12 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.2.2 h1:PmDhkIT8S5U4nkY/s78Xmf7CXT8qCliNEBhbrkBp3Q0=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.2.2/go.mod h1:Kj2pCkQ47klX1aAlDnlN/BUvwBiARqIJkc9iw1Up7q8=
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 h1:lJwNFV+xYjHREUTHJKx/ZF6CJSt9znxmLw9DqSTvyRU=
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0/go.mod h1:GfT0aGew8Qj5yiQVqOO5v7N8fanbJGyUoHqXg56qcVY=
github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe h1:HGuouUM1533rBXmMtR7qh5pYNSSjUZG90b/MgJAnb/A=
github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA=
github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
@ -1353,6 +1361,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -1704,8 +1713,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmhodges/clock v1.2.0 h1:eq4kys+NI0PLngzaHEe7AmPT90XMGIEySD1JfV1PDIs=
github.com/jmhodges/clock v1.2.0/go.mod h1:qKjhA7x7u/lQpPB1XAqX1b1lCI/w3/fNuYpI/ZjLynI=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
@ -2338,8 +2347,10 @@ github.com/t3rm1n4l/go-mega v0.0.0-20240219080617-d494b6a8ace7/go.mod h1:suDIky6
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 h1:Oj2e7Sae4XrOsk3ij21QjjEgAcVSeo9nkp0dI//cD2o=
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62/go.mod h1:qUzPVlSj2UgxJkVbH0ZwuuiR46U8RBMDT5KLY78Ifpw=
github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0=
github.com/testcontainers/testcontainers-go v0.34.0 h1:5fbgF0vIN5u+nD3IWabQwRybuB4GY8G2HHgCkbMzMHo=
github.com/testcontainers/testcontainers-go v0.34.0/go.mod h1:6P/kMkQe8yqPHfPWNulFGdFHTD8HB2vLq/231xY2iPQ=
github.com/testcontainers/testcontainers-go v0.35.0 h1:uADsZpTKFAtp8SLK+hMwSaa+X+JiERHtd4sQAFmXeMo=
github.com/testcontainers/testcontainers-go v0.35.0/go.mod h1:oEVBj5zrfJTrgjwONs1SsRbnBtH9OKl+IGl3UMcr2B4=
github.com/testcontainers/testcontainers-go/modules/azurite v0.35.0 h1:gUZ25e1DVE/0+ZZ0nupsIo+C1j7UNloN7Pkg3w6tceI=
github.com/testcontainers/testcontainers-go/modules/azurite v0.35.0/go.mod h1:2Fc67EpyOEexLAF99zhSuzu9H22zd83pkjxEHHTtHf4=
github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0 h1:LrMlsBH+nKJ2c6M7rOjbi7UivgofgAQo+LAwsWttR+Q=
github.com/testcontainers/testcontainers-go/modules/kafka v0.34.0/go.mod h1:4BIbeoKY/ZAf86MvWT5xJW5TvxbCPg67I5rBvwFsx4A=
github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs=
@ -3510,6 +3521,8 @@ modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
modernc.org/z v1.5.1/go.mod h1:eWFB510QWW5Th9YGZT81s+LwvaAs3Q2yr4sP0rmLkv8=
moul.io/http2curl/v2 v2.3.0 h1:9r3JfDzWPcbIklMOs2TnIFzDYvfAZvjeavG6EzP7jYs=
moul.io/http2curl/v2 v2.3.0/go.mod h1:RW4hyBjTWSYDOxapodpNEtX0g5Eb16sxklBqmd2RHcE=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw=
pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=

View File

@ -30,24 +30,25 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Configuration for Event Hubs output plugin
[[outputs.event_hubs]]
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
## Full connection string to the Event Hub instance. The shared access key
## must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
## Client timeout (defaults to 30s)
# timeout = "30s"
## Partition key
## Partition key to use for the event
## Metric tag or field name to use for the event partition key. The value of
## this tag or field is set as the key for events if it exists. If both, tag
## and field, exist the tag is preferred.
# partition_key = ""
## Set the maximum batch message size in bytes
## The allowable size depends on the Event Hub tier
## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes)
# max_message_size = 1000000
## The allowable size depends on the Event Hub tier, see
## https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
## for details. If unset the default size defined by Azure Event Hubs is
## used (currently 1,000,000 bytes)
# max_message_size = "1MB"
## Timeout for sending the data
# timeout = "30s"
## Data format to output.
## Each data format has its own unique set of configuration options, read

View File

@ -4,89 +4,57 @@ package event_hubs
import (
"context"
_ "embed"
"errors"
"fmt"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
/*
** Wrapper interface for eventhub.Hub
*/
type EventHubInterface interface {
GetHub(s string) error
Close(ctx context.Context) error
SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error
}
type eventHub struct {
hub *eventhub.Hub
}
func (eh *eventHub) GetHub(s string) error {
hub, err := eventhub.NewHubFromConnectionString(s)
if err != nil {
return err
}
eh.hub = hub
return nil
}
func (eh *eventHub) Close(ctx context.Context) error {
return eh.hub.Close(ctx)
}
func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
return eh.hub.SendBatch(ctx, iterator, opts...)
}
/* End wrapper interface */
type EventHubs struct {
Log telegraf.Logger `toml:"-"`
ConnectionString string `toml:"connection_string"`
Timeout config.Duration `toml:"timeout"`
PartitionKey string `toml:"partition_key"`
MaxMessageSize int `toml:"max_message_size"`
MaxMessageSize config.Size `toml:"max_message_size"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
Hub EventHubInterface
batchOptions []eventhub.BatchOption
serializer telegraf.Serializer
client *azeventhubs.ProducerClient
options azeventhubs.EventDataBatchOptions
serializer telegraf.Serializer
}
const (
defaultRequestTimeout = time.Second * 30
)
func (*EventHubs) SampleConfig() string {
return sampleConfig
}
func (e *EventHubs) Init() error {
err := e.Hub.GetHub(e.ConnectionString)
if err != nil {
return err
}
if e.MaxMessageSize > 0 {
e.batchOptions = append(e.batchOptions, eventhub.BatchWithMaxSizeInBytes(e.MaxMessageSize))
e.options.MaxBytes = uint64(e.MaxMessageSize)
}
return nil
}
func (*EventHubs) Connect() error {
func (e *EventHubs) Connect() error {
cfg := &azeventhubs.ProducerClientOptions{
ApplicationID: internal.FormatFullVersion(),
RetryOptions: azeventhubs.RetryOptions{MaxRetries: -1},
}
client, err := azeventhubs.NewProducerClientFromConnectionString(e.ConnectionString, "", cfg)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
e.client = client
return nil
}
@ -94,13 +62,7 @@ func (e *EventHubs) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
err := e.Hub.Close(ctx)
if err != nil {
return err
}
return nil
return e.client.Close(ctx)
}
func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) {
@ -108,46 +70,96 @@ func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) {
}
func (e *EventHubs) Write(metrics []telegraf.Metric) error {
events := make([]*eventhub.Event, 0, len(metrics))
for _, metric := range metrics {
payload, err := e.serializer.Serialize(metric)
ctx := context.Background()
batchOptions := e.options
batches := make(map[string]*azeventhubs.EventDataBatch)
for i := 0; i < len(metrics); i++ {
m := metrics[i]
// Prepare the payload
payload, err := e.serializer.Serialize(m)
if err != nil {
e.Log.Debugf("Could not serialize metric: %v", err)
e.Log.Errorf("Could not serialize metric: %v", err)
e.Log.Tracef("metric: %+v", m)
continue
}
event := eventhub.NewEvent(payload)
// Get the batcher for the chosen partition
partition := "<default>"
batchOptions.PartitionKey = nil
if e.PartitionKey != "" {
if key, ok := metric.GetTag(e.PartitionKey); ok {
event.PartitionKey = &key
} else if key, ok := metric.GetField(e.PartitionKey); ok {
if strKey, ok := key.(string); ok {
event.PartitionKey = &strKey
if key, ok := m.GetTag(e.PartitionKey); ok {
partition = key
batchOptions.PartitionKey = &partition
} else if key, ok := m.GetField(e.PartitionKey); ok {
if k, ok := key.(string); ok {
partition = k
batchOptions.PartitionKey = &partition
}
}
}
if _, found := batches[partition]; !found {
batches[partition], err = e.client.NewEventDataBatch(ctx, &batchOptions)
if err != nil {
return fmt.Errorf("creating batch for partition %q failed: %w", partition, err)
}
}
events = append(events, event)
// Add the event to the partition and send it if the batch is full
err = batches[partition].AddEventData(&azeventhubs.EventData{Body: payload}, nil)
if err == nil {
continue
}
// If the event doesn't fit into the batch anymore, send the batch
if !errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
return fmt.Errorf("adding metric to batch for partition %q failed: %w", partition, err)
}
// The event is larger than the maximum allowed size so there
// is nothing we can do here but have to drop the metric.
if batches[partition].NumEvents() == 0 {
e.Log.Errorf("Metric with %d bytes exceeds the maximum allowed size and must be dropped!", len(payload))
e.Log.Tracef("metric: %+v", m)
continue
}
if err := e.send(batches[partition]); err != nil {
return fmt.Errorf("sending batch for partition %q failed: %w", partition, err)
}
// Create a new metric and reiterate over the current metric to be
// added in the next iteration of the for loop.
batches[partition], err = e.client.NewEventDataBatch(ctx, &e.options)
if err != nil {
return fmt.Errorf("creating batch for partition %q failed: %w", partition, err)
}
i--
}
// Send the remaining batches that never exceeded the batch size
for partition, batch := range batches {
if batch.NumBytes() == 0 {
continue
}
if err := e.send(batch); err != nil {
return fmt.Errorf("sending batch for partition %q failed: %w", partition, err)
}
}
return nil
}
func (e *EventHubs) send(batch *azeventhubs.EventDataBatch) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...)
if err != nil {
return err
}
return nil
return e.client.SendEventDataBatch(ctx, batch, nil)
}
func init() {
outputs.Add("event_hubs", func() telegraf.Output {
return &EventHubs{
Hub: &eventHub{},
Timeout: config.Duration(defaultRequestTimeout),
Timeout: config.Duration(30 * time.Second),
}
})
}

View File

@ -2,161 +2,293 @@ package event_hubs
import (
"context"
"fmt"
"math/rand"
"log"
"os"
"path/filepath"
"testing"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/stretchr/testify/mock"
"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/azurite"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/testutil"
)
/*
** Wrapper interface mock for eventhub.Hub
*/
type mockEventHub struct {
mock.Mock
}
func (eh *mockEventHub) GetHub(s string) error {
args := eh.Called(s)
return args.Error(0)
}
func (eh *mockEventHub) Close(ctx context.Context) error {
args := eh.Called(ctx)
return args.Error(0)
}
func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
args := eh.Called(ctx, iterator, opts)
return args.Error(0)
}
/* End wrapper interface */
func TestInitAndWrite(t *testing.T) {
serializer := &json.Serializer{}
require.NoError(t, serializer.Init())
mockHub := &mockEventHub{}
e := &EventHubs{
Hub: mockHub,
ConnectionString: "mock",
Timeout: config.Duration(time.Second * 5),
MaxMessageSize: 1000000,
serializer: serializer,
}
mockHub.On("GetHub", mock.Anything).Return(nil).Once()
require.NoError(t, e.Init())
mockHub.AssertExpectations(t)
metrics := testutil.MockMetrics()
mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
require.NoError(t, e.Write(metrics))
mockHub.AssertExpectations(t)
}
/*
** Integration test (requires an Event Hubs instance)
*/
func TestInitAndWriteIntegration(t *testing.T) {
func TestEmulatorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
if os.Getenv("EVENTHUB_CONNECTION_STRING") == "" {
t.Skip("Missing environment variable EVENTHUB_CONNECTION_STRING")
// Require the developers to explicitly accept the EULA of the emulator
if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" {
t.Skip(`
Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator
at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md
and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA
to 'yes'.
`)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// Create a new, empty Event Hub
// NB: for this to work, the connection string needs to grant "Manage" permissions on the root namespace
mHub, err := eventhub.NewHubManagerFromConnectionString(os.Getenv("EVENTHUB_CONNECTION_STRING"))
require.NoError(t, err)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
name := fmt.Sprintf("testmetrics%05d", r.Intn(10000))
entity, err := mHub.Put(ctx, name, eventhub.HubWithPartitionCount(1))
require.NoError(t, err)
// Delete the test hub
// Setup the Azure Event Hub emulator environment
// See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator
ctx := context.Background()
azuriteContainer, err := azurite.Run(ctx, "mcr.microsoft.com/azure-storage/azurite:3.28.0")
require.NoError(t, err, "failed to start Azurite container")
defer func() {
err := mHub.Delete(ctx, entity.Name)
require.NoError(t, err)
if err := testcontainers.TerminateContainer(azuriteContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
blobPort, err := azuriteContainer.MappedPort(ctx, azurite.BlobPort)
require.NoError(t, err)
// Configure the plugin to target the newly created hub
metadataPort, err := azuriteContainer.MappedPort(ctx, azurite.TablePort)
require.NoError(t, err)
cfgfile, err := filepath.Abs(filepath.Join("testdata", "Config.json"))
require.NoError(t, err, "getting absolute path for config")
emulator := testutil.Container{
Image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest",
Env: map[string]string{
"BLOB_SERVER": "host.docker.internal:" + blobPort.Port(),
"METADATA_SERVER": "host.docker.internal:" + metadataPort.Port(),
"ACCEPT_EULA": "Y",
},
Files: map[string]string{
"/Eventhubs_Emulator/ConfigFiles/Config.json": cfgfile,
},
HostAccessPorts: []int{blobPort.Int(), metadataPort.Int()},
HostConfigModifier: func(hc *container.HostConfig) {
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
},
ExposedPorts: []string{"5672"},
WaitingFor: wait.ForListeningPort(nat.Port("5672")),
}
require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container")
defer emulator.Terminate()
conn := "Endpoint=sb://" + emulator.Address + ":" + emulator.Ports["5672"] + ";"
conn += "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=test"
// Setup plugin and connect
serializer := &json.Serializer{}
require.NoError(t, serializer.Init())
e := &EventHubs{
Hub: &eventHub{},
ConnectionString: testHubCS,
Timeout: config.Duration(time.Second * 5),
serializer: serializer,
plugin := &EventHubs{
ConnectionString: conn,
Timeout: config.Duration(3 * time.Second),
Log: testutil.Logger{},
}
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Make sure we are connected
require.Eventually(t, func() bool {
return plugin.Write(testutil.MockMetrics()) == nil
}, 3*time.Second, 500*time.Millisecond)
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{
"source": "foo",
"division": "A",
"type": "temperature",
},
map[string]interface{}{
"value": 23,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "foo",
"division": "A",
"type": "humidity",
},
map[string]interface{}{
"value": 59,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "bar",
"division": "B",
"type": "temperature",
},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "bar",
"division": "B",
"type": "humidity",
},
map[string]interface{}{
"value": 87,
},
time.Unix(0, 0),
),
}
// Verify that we can connect to Event Hubs
require.NoError(t, e.Init())
// Verify that we can successfully write data to Event Hubs
metrics := testutil.MockMetrics()
require.NoError(t, e.Write(metrics))
/*
** Verify we can read data back from the test hub
*/
exit := make(chan string)
// Create a hub client for receiving
hub, err := eventhub.NewHubFromConnectionString(testHubCS)
require.NoError(t, err)
// The handler function will pass received messages via the channel
handler := func(_ context.Context, event *eventhub.Event) error {
exit <- string(event.Data)
return nil
}
// Set up the receivers
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
require.NoError(t, err)
for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset("-1"))
require.NoError(t, err)
}
// Wait to receive the same number of messages sent, with timeout
received := 0
wait:
for _, metric := range metrics {
select {
case m := <-exit:
t.Logf("Received for %s: %s", metric.Name(), m)
received = received + 1
case <-time.After(10 * time.Second):
t.Logf("Timeout")
break wait
}
}
// Make sure received == sent
require.Len(t, metrics, received)
require.NoError(t, plugin.Write(input))
}
func TestReconnectIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Require the developers to explicitly accept the EULA of the emulator
if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" {
t.Skip(`
Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator
at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md
and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA
to 'yes'.
`)
}
// Setup the Azure Event Hub emulator environment
// See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator
ctx := context.Background()
azuriteContainer, err := azurite.Run(ctx, "mcr.microsoft.com/azure-storage/azurite:3.28.0")
require.NoError(t, err, "failed to start Azurite container")
defer func() {
if err := testcontainers.TerminateContainer(azuriteContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
blobPort, err := azuriteContainer.MappedPort(ctx, azurite.BlobPort)
require.NoError(t, err)
metadataPort, err := azuriteContainer.MappedPort(ctx, azurite.TablePort)
require.NoError(t, err)
cfgfile, err := filepath.Abs(filepath.Join("testdata", "Config.json"))
require.NoError(t, err, "getting absolute path for config")
emulator := testutil.Container{
Image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest",
Env: map[string]string{
"BLOB_SERVER": "host.docker.internal:" + blobPort.Port(),
"METADATA_SERVER": "host.docker.internal:" + metadataPort.Port(),
"ACCEPT_EULA": "Y",
},
Files: map[string]string{
"/Eventhubs_Emulator/ConfigFiles/Config.json": cfgfile,
},
HostAccessPorts: []int{blobPort.Int(), metadataPort.Int()},
HostConfigModifier: func(hc *container.HostConfig) {
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
},
ExposedPorts: []string{"5672"},
WaitingFor: wait.ForListeningPort(nat.Port("5672")),
}
require.NoError(t, emulator.Start(), "failed to start Azure Event Hub emulator container")
defer emulator.Terminate()
conn := "Endpoint=sb://" + emulator.Address + ":" + emulator.Ports["5672"] + ";"
conn += "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=test"
// Setup plugin and connect
serializer := &json.Serializer{}
require.NoError(t, serializer.Init())
plugin := &EventHubs{
ConnectionString: conn,
Timeout: config.Duration(3 * time.Second),
Log: testutil.Logger{},
}
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Make sure we are connected
require.Eventually(t, func() bool {
return plugin.Write(testutil.MockMetrics()) == nil
}, 3*time.Second, 500*time.Millisecond)
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{
"source": "foo",
"division": "A",
"type": "temperature",
},
map[string]interface{}{
"value": 23,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "foo",
"division": "A",
"type": "humidity",
},
map[string]interface{}{
"value": 59,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "bar",
"division": "B",
"type": "temperature",
},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "bar",
"division": "B",
"type": "humidity",
},
map[string]interface{}{
"value": 87,
},
time.Unix(0, 0),
),
}
// This write should succeed as we should be able to connect to the
// container
require.NoError(t, plugin.Write(input))
// Pause the container to simulate connection loss. Subsequent writes
// should fail until the container is resumed
require.NoError(t, emulator.Pause())
require.ErrorIs(t, plugin.Write(input), context.DeadlineExceeded)
// Resume the container to check if the plugin reconnects
require.NoError(t, emulator.Resume())
require.NoError(t, plugin.Write(input))
}

View File

@ -1,23 +1,24 @@
# Configuration for Event Hubs output plugin
[[outputs.event_hubs]]
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
## Full connection string to the Event Hub instance. The shared access key
## must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
## Client timeout (defaults to 30s)
# timeout = "30s"
## Partition key
## Partition key to use for the event
## Metric tag or field name to use for the event partition key. The value of
## this tag or field is set as the key for events if it exists. If both, tag
## and field, exist the tag is preferred.
# partition_key = ""
## Set the maximum batch message size in bytes
## The allowable size depends on the Event Hub tier
## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes)
# max_message_size = 1000000
## The allowable size depends on the Event Hub tier, see
## https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
## for details. If unset the default size defined by Azure Event Hubs is
## used (currently 1,000,000 bytes)
# max_message_size = "1MB"
## Timeout for sending the data
# timeout = "30s"
## Data format to output.
## Each data format has its own unique set of configuration options, read

View File

@ -0,0 +1,24 @@
{
"UserConfig": {
"NamespaceConfig": [
{
"Type": "EventHub",
"Name": "emulatorNs1",
"Entities": [
{
"Name": "test",
"PartitionCount": 2,
"ConsumerGroups": [
{
"Name": "cg1"
}
]
}
]
}
],
"LoggingConfig": {
"Type": "Console"
}
}
}

View File

@ -27,6 +27,7 @@ type Container struct {
Entrypoint []string
Env map[string]string
Files map[string]string
HostAccessPorts []int
HostConfigModifier func(*container.HostConfig)
ExposedPorts []string
Cmd []string
@ -62,6 +63,7 @@ func (c *Container) Start() error {
Env: c.Env,
ExposedPorts: c.ExposedPorts,
Files: files,
HostAccessPorts: c.HostAccessPorts,
HostConfigModifier: c.HostConfigModifier,
Cmd: c.Cmd,
Image: c.Image,