diff --git a/plugins/outputs/event_hubs/README.md b/plugins/outputs/event_hubs/README.md index 24ed32f49..7f9915cf9 100644 --- a/plugins/outputs/event_hubs/README.md +++ b/plugins/outputs/event_hubs/README.md @@ -21,8 +21,16 @@ JSON is probably the easiest to integrate with downstream components. ## The full connection string to the Event Hub (required) ## The shared access key must have "Send" permissions on the target Event Hub. connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" + ## Client timeout (defaults to 30s) # timeout = "30s" + + ## Partition key + ## Metric tag or field name to use for the event partition key. The value of + ## this tag or field is set as the key for events if it exists. If both, tag + ## and field, exist the tag is preferred. + # partition_key = "" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index e6a4c1da5..5a0c8d3d6 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -50,7 +50,8 @@ func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterat type EventHubs struct { Log telegraf.Logger `toml:"-"` ConnectionString string `toml:"connection_string"` - Timeout config.Duration + Timeout config.Duration `toml:"timeout"` + PartitionKey string `toml:"partition_key"` Hub EventHubInterface serializer serializers.Serializer @@ -102,7 +103,18 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error { continue } - events = append(events, eventhub.NewEvent(payload)) + event := eventhub.NewEvent(payload) + if e.PartitionKey != "" { + if key, ok := metric.GetTag(e.PartitionKey); ok { + event.PartitionKey = &key + } else if key, ok := metric.GetField(e.PartitionKey); ok { + if strKey, ok := key.(string); ok { + event.PartitionKey = &strKey + } + } + } + + events = append(events, event) } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))