feat(outputs.event_hubs): Expose max message size batch option (#11991)

This commit is contained in:
R290 2022-10-12 16:49:11 +02:00 committed by GitHub
parent fae64e2a63
commit 430ec5b960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 3 deletions

View File

@ -31,6 +31,12 @@ JSON is probably the easiest to integrate with downstream components.
## and field, exist the tag is preferred. ## and field, exist the tag is preferred.
# partition_key = "" # partition_key = ""
## Set the maximum batch message size in bytes
## The allowable size depends on the Event Hub tier
## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes)
# max_message_size = 1000000
## Data format to output. ## Data format to output.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:

View File

@ -58,9 +58,11 @@ type EventHubs struct {
ConnectionString string `toml:"connection_string"` ConnectionString string `toml:"connection_string"`
Timeout config.Duration `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
PartitionKey string `toml:"partition_key"` PartitionKey string `toml:"partition_key"`
MaxMessageSize int `toml:"max_message_size"`
Hub EventHubInterface Hub EventHubInterface
serializer serializers.Serializer batchOptions []eventhub.BatchOption
serializer serializers.Serializer
} }
const ( const (
@ -78,6 +80,10 @@ func (e *EventHubs) Init() error {
return err return err
} }
if e.MaxMessageSize > 0 {
e.batchOptions = append(e.batchOptions, eventhub.BatchWithMaxSizeInBytes(e.MaxMessageSize))
}
return nil return nil
} }
@ -130,7 +136,7 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel() defer cancel()
err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...)) err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...)
if err != nil { if err != nil {
return err return err

View File

@ -49,6 +49,7 @@ func TestInitAndWrite(t *testing.T) {
Hub: mockHub, Hub: mockHub,
ConnectionString: "mock", ConnectionString: "mock",
Timeout: config.Duration(time.Second * 5), Timeout: config.Duration(time.Second * 5),
MaxMessageSize: 1000000,
serializer: serializer, serializer: serializer,
} }

View File

@ -13,6 +13,12 @@
## and field, exist the tag is preferred. ## and field, exist the tag is preferred.
# partition_key = "" # partition_key = ""
## Set the maximum batch message size in bytes
## The allowable size depends on the Event Hub tier
## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers
## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes)
# max_message_size = 1000000
## Data format to output. ## Data format to output.
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here: