diff --git a/plugins/outputs/event_hubs/README.md b/plugins/outputs/event_hubs/README.md index c7c3514c8..089176d8c 100644 --- a/plugins/outputs/event_hubs/README.md +++ b/plugins/outputs/event_hubs/README.md @@ -31,6 +31,12 @@ JSON is probably the easiest to integrate with downstream components. ## and field, exist the tag is preferred. # partition_key = "" + ## Set the maximum batch message size in bytes + ## The allowable size depends on the Event Hub tier + ## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers + ## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes) + # max_message_size = 1000000 + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index 74ef5e44e..618556cf8 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -58,9 +58,11 @@ type EventHubs struct { ConnectionString string `toml:"connection_string"` Timeout config.Duration `toml:"timeout"` PartitionKey string `toml:"partition_key"` + MaxMessageSize int `toml:"max_message_size"` - Hub EventHubInterface - serializer serializers.Serializer + Hub EventHubInterface + batchOptions []eventhub.BatchOption + serializer serializers.Serializer } const ( @@ -78,6 +80,10 @@ func (e *EventHubs) Init() error { return err } + if e.MaxMessageSize > 0 { + e.batchOptions = append(e.batchOptions, eventhub.BatchWithMaxSizeInBytes(e.MaxMessageSize)) + } + return nil } @@ -130,7 +136,7 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) defer cancel() - err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...)) + err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...) if err != nil { return err diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index 788491e61..661da7cf4 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -49,6 +49,7 @@ func TestInitAndWrite(t *testing.T) { Hub: mockHub, ConnectionString: "mock", Timeout: config.Duration(time.Second * 5), + MaxMessageSize: 1000000, serializer: serializer, } diff --git a/plugins/outputs/event_hubs/sample.conf b/plugins/outputs/event_hubs/sample.conf index 6290e1e83..426500ea7 100644 --- a/plugins/outputs/event_hubs/sample.conf +++ b/plugins/outputs/event_hubs/sample.conf @@ -13,6 +13,12 @@ ## and field, exist the tag is preferred. # partition_key = "" + ## Set the maximum batch message size in bytes + ## The allowable size depends on the Event Hub tier + ## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers + ## Setting this to 0 means using the default size from the Azure Event Hubs Client library (1000000 bytes) + # max_message_size = 1000000 + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: