feat(inputs.amqp_consumer): Add support to rabbitmq stream queue (#13496)
This commit is contained in:
parent
2e957cc003
commit
ef86635d21
|
|
@ -73,6 +73,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## If true, queue will be passively declared.
|
## If true, queue will be passively declared.
|
||||||
# queue_passive = false
|
# queue_passive = false
|
||||||
|
|
||||||
|
## Additional arguments when consuming from Queue
|
||||||
|
# queue_consume_arguments = { }
|
||||||
|
# queue_consume_arguments = {"x-stream-offset" = "first"}
|
||||||
|
|
||||||
## A binding between the exchange and queue using this binding key is
|
## A binding between the exchange and queue using this binding key is
|
||||||
## created. If unset, no binding is created.
|
## created. If unset, no binding is created.
|
||||||
binding_key = "#"
|
binding_key = "#"
|
||||||
|
|
|
||||||
|
|
@ -40,9 +40,10 @@ type AMQPConsumer struct {
|
||||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||||
|
|
||||||
// Queue Name
|
// Queue Name
|
||||||
Queue string `toml:"queue"`
|
Queue string `toml:"queue"`
|
||||||
QueueDurability string `toml:"queue_durability"`
|
QueueDurability string `toml:"queue_durability"`
|
||||||
QueuePassive bool `toml:"queue_passive"`
|
QueuePassive bool `toml:"queue_passive"`
|
||||||
|
QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"`
|
||||||
|
|
||||||
// Binding Key
|
// Binding Key
|
||||||
BindingKey string `toml:"binding_key"`
|
BindingKey string `toml:"binding_key"`
|
||||||
|
|
@ -284,14 +285,19 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
|
||||||
return nil, fmt.Errorf("failed to set QoS: %w", err)
|
return nil, fmt.Errorf("failed to set QoS: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumeArgs := make(amqp.Table, len(a.QueueConsumeArguments))
|
||||||
|
for k, v := range a.QueueConsumeArguments {
|
||||||
|
consumeArgs[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
msgs, err := ch.Consume(
|
msgs, err := ch.Consume(
|
||||||
q.Name, // queue
|
q.Name, // queue
|
||||||
"", // consumer
|
"", // consumer
|
||||||
false, // auto-ack
|
false, // auto-ack
|
||||||
false, // exclusive
|
false, // exclusive
|
||||||
false, // no-local
|
false, // no-local
|
||||||
false, // no-wait
|
false, // no-wait
|
||||||
nil, // arguments
|
consumeArgs, // arguments
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed establishing connection to queue: %w", err)
|
return nil, fmt.Errorf("failed establishing connection to queue: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,10 @@
|
||||||
## If true, queue will be passively declared.
|
## If true, queue will be passively declared.
|
||||||
# queue_passive = false
|
# queue_passive = false
|
||||||
|
|
||||||
|
## Additional arguments when consuming from Queue
|
||||||
|
# queue_consume_arguments = { }
|
||||||
|
# queue_consume_arguments = {"x-stream-offset" = "first"}
|
||||||
|
|
||||||
## A binding between the exchange and queue using this binding key is
|
## A binding between the exchange and queue using this binding key is
|
||||||
## created. If unset, no binding is created.
|
## created. If unset, no binding is created.
|
||||||
binding_key = "#"
|
binding_key = "#"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue