diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index 6da4e7077..c86bbb38b 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -105,6 +105,10 @@ to use them. # queue_consume_arguments = { } # queue_consume_arguments = {"x-stream-offset" = "first"} + ## Additional queue arguments. + # queue_arguments = { } + # queue_arguments = {"x-max-length" = 100} + ## A binding between the exchange and queue using this binding key is ## created. If unset, no binding is created. binding_key = "#" diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index dba902bce..448efad87 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -44,6 +44,7 @@ type AMQPConsumer struct { Queue string `toml:"queue"` QueueDurability string `toml:"queue_durability"` QueuePassive bool `toml:"queue_passive"` + QueueArguments map[string]int `toml:"queue_arguments"` QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"` BindingKey string `toml:"binding_key"` PrefetchCount int `toml:"prefetch_count"` @@ -369,6 +370,11 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error) queueDurable = false } + queueArgs := make(amqp.Table, len(a.QueueArguments)) + for k, v := range a.QueueArguments { + queueArgs[k] = v + } + if a.QueuePassive { queue, err = channel.QueueDeclarePassive( a.Queue, // queue @@ -376,7 +382,7 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error) false, // delete when unused false, // exclusive false, // no-wait - nil, // arguments + queueArgs, // arguments ) } else { queue, err = channel.QueueDeclare( @@ -385,7 +391,7 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error) false, // delete when unused false, // exclusive false, // no-wait - nil, // arguments + queueArgs, // arguments ) } if err != nil { diff --git a/plugins/inputs/amqp_consumer/sample.conf b/plugins/inputs/amqp_consumer/sample.conf index 035c109fe..6e9772626 100644 --- a/plugins/inputs/amqp_consumer/sample.conf +++ b/plugins/inputs/amqp_consumer/sample.conf @@ -38,6 +38,10 @@ # queue_consume_arguments = { } # queue_consume_arguments = {"x-stream-offset" = "first"} + ## Additional queue arguments. + # queue_arguments = { } + # queue_arguments = {"x-max-length" = 100} + ## A binding between the exchange and queue using this binding key is ## created. If unset, no binding is created. binding_key = "#"