feat(inputs.amqp_consumer): Allow specification of queue arguments (#16141)
Co-authored-by: Robin Lucbernet <rlucbernet@maltem.com>
This commit is contained in:
parent
fe4246fab2
commit
d075815f29
|
|
@ -105,6 +105,10 @@ to use them.
|
||||||
# queue_consume_arguments = { }
|
# queue_consume_arguments = { }
|
||||||
# queue_consume_arguments = {"x-stream-offset" = "first"}
|
# queue_consume_arguments = {"x-stream-offset" = "first"}
|
||||||
|
|
||||||
|
## Additional queue arguments.
|
||||||
|
# queue_arguments = { }
|
||||||
|
# queue_arguments = {"x-max-length" = 100}
|
||||||
|
|
||||||
## A binding between the exchange and queue using this binding key is
|
## A binding between the exchange and queue using this binding key is
|
||||||
## created. If unset, no binding is created.
|
## created. If unset, no binding is created.
|
||||||
binding_key = "#"
|
binding_key = "#"
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ type AMQPConsumer struct {
|
||||||
Queue string `toml:"queue"`
|
Queue string `toml:"queue"`
|
||||||
QueueDurability string `toml:"queue_durability"`
|
QueueDurability string `toml:"queue_durability"`
|
||||||
QueuePassive bool `toml:"queue_passive"`
|
QueuePassive bool `toml:"queue_passive"`
|
||||||
|
QueueArguments map[string]int `toml:"queue_arguments"`
|
||||||
QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"`
|
QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"`
|
||||||
BindingKey string `toml:"binding_key"`
|
BindingKey string `toml:"binding_key"`
|
||||||
PrefetchCount int `toml:"prefetch_count"`
|
PrefetchCount int `toml:"prefetch_count"`
|
||||||
|
|
@ -369,6 +370,11 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error)
|
||||||
queueDurable = false
|
queueDurable = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queueArgs := make(amqp.Table, len(a.QueueArguments))
|
||||||
|
for k, v := range a.QueueArguments {
|
||||||
|
queueArgs[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
if a.QueuePassive {
|
if a.QueuePassive {
|
||||||
queue, err = channel.QueueDeclarePassive(
|
queue, err = channel.QueueDeclarePassive(
|
||||||
a.Queue, // queue
|
a.Queue, // queue
|
||||||
|
|
@ -376,7 +382,7 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error)
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
nil, // arguments
|
queueArgs, // arguments
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
queue, err = channel.QueueDeclare(
|
queue, err = channel.QueueDeclare(
|
||||||
|
|
@ -385,7 +391,7 @@ func (a *AMQPConsumer) declareQueue(channel *amqp.Channel) (*amqp.Queue, error)
|
||||||
false, // delete when unused
|
false, // delete when unused
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
nil, // arguments
|
queueArgs, // arguments
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,10 @@
|
||||||
# queue_consume_arguments = { }
|
# queue_consume_arguments = { }
|
||||||
# queue_consume_arguments = {"x-stream-offset" = "first"}
|
# queue_consume_arguments = {"x-stream-offset" = "first"}
|
||||||
|
|
||||||
|
## Additional queue arguments.
|
||||||
|
# queue_arguments = { }
|
||||||
|
# queue_arguments = {"x-max-length" = 100}
|
||||||
|
|
||||||
## A binding between the exchange and queue using this binding key is
|
## A binding between the exchange and queue using this binding key is
|
||||||
## created. If unset, no binding is created.
|
## created. If unset, no binding is created.
|
||||||
binding_key = "#"
|
binding_key = "#"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue