From ef86635d21fbd27cee5b0961f7fbbb1c9b6a40cb Mon Sep 17 00:00:00 2001 From: massimogallina <114730346+massimogallina@users.noreply.github.com> Date: Tue, 27 Jun 2023 19:40:27 +0200 Subject: [PATCH] feat(inputs.amqp_consumer): Add support to rabbitmq stream queue (#13496) --- plugins/inputs/amqp_consumer/README.md | 4 +++ plugins/inputs/amqp_consumer/amqp_consumer.go | 26 ++++++++++++------- plugins/inputs/amqp_consumer/sample.conf | 4 +++ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index 38cefc266..58550e67e 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -73,6 +73,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## If true, queue will be passively declared. # queue_passive = false + ## Additional arguments when consuming from Queue + # queue_consume_arguments = { } + # queue_consume_arguments = {"x-stream-offset" = "first"} + ## A binding between the exchange and queue using this binding key is ## created. If unset, no binding is created. binding_key = "#" diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 0c7b9eb4f..fb309bb8f 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -40,9 +40,10 @@ type AMQPConsumer struct { MaxUndeliveredMessages int `toml:"max_undelivered_messages"` // Queue Name - Queue string `toml:"queue"` - QueueDurability string `toml:"queue_durability"` - QueuePassive bool `toml:"queue_passive"` + Queue string `toml:"queue"` + QueueDurability string `toml:"queue_durability"` + QueuePassive bool `toml:"queue_passive"` + QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"` // Binding Key BindingKey string `toml:"binding_key"` @@ -284,14 +285,19 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err return nil, fmt.Errorf("failed to set QoS: %w", err) } + consumeArgs := make(amqp.Table, len(a.QueueConsumeArguments)) + for k, v := range a.QueueConsumeArguments { + consumeArgs[k] = v + } + msgs, err := ch.Consume( - q.Name, // queue - "", // consumer - false, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // arguments + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + consumeArgs, // arguments ) if err != nil { return nil, fmt.Errorf("failed establishing connection to queue: %w", err) diff --git a/plugins/inputs/amqp_consumer/sample.conf b/plugins/inputs/amqp_consumer/sample.conf index 79cd48086..4af6b8824 100644 --- a/plugins/inputs/amqp_consumer/sample.conf +++ b/plugins/inputs/amqp_consumer/sample.conf @@ -34,6 +34,10 @@ ## If true, queue will be passively declared. # queue_passive = false + ## Additional arguments when consuming from Queue + # queue_consume_arguments = { } + # queue_consume_arguments = {"x-stream-offset" = "first"} + ## A binding between the exchange and queue using this binding key is ## created. If unset, no binding is created. binding_key = "#"