From 501e920ef1a5e65747b3d759d18c21d9caf1b74b Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 9 Mar 2023 15:59:07 +0100 Subject: [PATCH] fix(inputs.amqp_consumer): Avoid deprecations when handling defaults (#12818) --- plugins/inputs/amqp_consumer/amqp_consumer.go | 66 +++++++++++-------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 5018497a3..aeb34e0f4 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -23,10 +23,6 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - defaultMaxUndeliveredMessages = 1000 -) - type empty struct{} type semaphore chan empty @@ -80,23 +76,46 @@ func (a *externalAuth) Response() string { return "\000" } -const ( - DefaultAuthMethod = "PLAIN" - - DefaultBroker = "amqp://localhost:5672/influxdb" - - DefaultExchangeType = "topic" - DefaultExchangeDurability = "durable" - - DefaultQueueDurability = "durable" - - DefaultPrefetchCount = 50 -) - func (*AMQPConsumer) SampleConfig() string { return sampleConfig } +func (a *AMQPConsumer) Init() error { + // Defaults + if a.URL != "" { + a.Brokers = append(a.Brokers, a.URL) + } + if len(a.Brokers) == 0 { + a.Brokers = []string{"amqp://localhost:5672/influxdb"} + } + + if a.AuthMethod == "" { + a.AuthMethod = "PLAIN" + } + + if a.ExchangeType == "" { + a.ExchangeType = "topic" + } + + if a.ExchangeDurability == "" { + a.ExchangeDurability = "durable" + } + + if a.QueueDurability == "" { + a.QueueDurability = "durable" + } + + if a.PrefetchCount == 0 { + a.PrefetchCount = 50 + } + + if a.MaxUndeliveredMessages == 0 { + a.MaxUndeliveredMessages = 1000 + } + + return nil +} + func (a *AMQPConsumer) SetParser(parser parsers.Parser) { a.parser = parser } @@ -190,9 +209,6 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) { brokers := a.Brokers - if len(brokers) == 0 { - brokers = []string{a.URL} - } p := rand.Perm(len(brokers)) for _, n := range p { @@ -454,14 +470,6 @@ func (a *AMQPConsumer) Stop() { func init() { inputs.Add("amqp_consumer", func() telegraf.Input { - return &AMQPConsumer{ - URL: DefaultBroker, - AuthMethod: DefaultAuthMethod, - ExchangeType: DefaultExchangeType, - ExchangeDurability: DefaultExchangeDurability, - QueueDurability: DefaultQueueDurability, - PrefetchCount: DefaultPrefetchCount, - MaxUndeliveredMessages: defaultMaxUndeliveredMessages, - } + return &AMQPConsumer{} }) }