chore(outputs.amqp): Remove deprecated options from sample config (#15873)
This commit is contained in:
parent
ccdbc4c480
commit
9a8b5022e0
|
|
@ -33,10 +33,6 @@ to use them.
|
||||||
```toml @sample.conf
|
```toml @sample.conf
|
||||||
# Publishes metrics to an AMQP broker
|
# Publishes metrics to an AMQP broker
|
||||||
[[outputs.amqp]]
|
[[outputs.amqp]]
|
||||||
## Broker to publish to.
|
|
||||||
## deprecated in 1.7; use the brokers option
|
|
||||||
# url = "amqp://localhost:5672/influxdb"
|
|
||||||
|
|
||||||
## Brokers to publish to. If multiple brokers are specified a random broker
|
## Brokers to publish to. If multiple brokers are specified a random broker
|
||||||
## will be selected anytime a connection is established. This can be
|
## will be selected anytime a connection is established. This can be
|
||||||
## helpful for load balancing when not using a dedicated load balancer.
|
## helpful for load balancing when not using a dedicated load balancer.
|
||||||
|
|
@ -85,14 +81,6 @@ to use them.
|
||||||
## One of "transient" or "persistent".
|
## One of "transient" or "persistent".
|
||||||
# delivery_mode = "transient"
|
# delivery_mode = "transient"
|
||||||
|
|
||||||
## InfluxDB database added as a message header.
|
|
||||||
## deprecated in 1.7; use the headers option
|
|
||||||
# database = "telegraf"
|
|
||||||
|
|
||||||
## InfluxDB retention policy added as a message header
|
|
||||||
## deprecated in 1.7; use the headers option
|
|
||||||
# retention_policy = "default"
|
|
||||||
|
|
||||||
## Static headers added to each published message.
|
## Static headers added to each published message.
|
||||||
# headers = { }
|
# headers = { }
|
||||||
# headers = {"database" = "telegraf", "retention_policy" = "default"}
|
# headers = {"database" = "telegraf", "retention_policy" = "default"}
|
||||||
|
|
|
||||||
|
|
@ -88,29 +88,27 @@ func (q *AMQP) SetSerializer(serializer serializers.Serializer) {
|
||||||
q.serializer = serializer
|
q.serializer = serializer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *AMQP) Connect() error {
|
func (q *AMQP) Init() error {
|
||||||
if q.config == nil {
|
var err error
|
||||||
clientConfig, err := q.makeClientConfig()
|
q.config, err = q.makeClientConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
|
||||||
q.config = clientConfig
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
|
q.encoder, err = internal.NewContentEncoder(q.ContentEncoding)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
q.client, err = q.connect(q.config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *AMQP) Connect() error {
|
||||||
|
var err error
|
||||||
|
q.client, err = q.connect(q.config)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (q *AMQP) Close() error {
|
func (q *AMQP) Close() error {
|
||||||
if q.client != nil {
|
if q.client != nil {
|
||||||
return q.client.Close()
|
return q.client.Close()
|
||||||
|
|
@ -323,13 +321,15 @@ func connect(clientConfig *ClientConfig) (Client, error) {
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("amqp", func() telegraf.Output {
|
outputs.Add("amqp", func() telegraf.Output {
|
||||||
return &AMQP{
|
return &AMQP{
|
||||||
URL: DefaultURL,
|
Brokers: []string{DefaultURL},
|
||||||
ExchangeType: DefaultExchangeType,
|
ExchangeType: DefaultExchangeType,
|
||||||
AuthMethod: DefaultAuthMethod,
|
AuthMethod: DefaultAuthMethod,
|
||||||
Database: DefaultDatabase,
|
Headers: map[string]string{
|
||||||
RetentionPolicy: DefaultRetentionPolicy,
|
"database": DefaultDatabase,
|
||||||
Timeout: config.Duration(time.Second * 5),
|
"retention_policy": DefaultRetentionPolicy,
|
||||||
connect: connect,
|
},
|
||||||
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
connect: connect,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,9 +52,11 @@ func TestConnect(t *testing.T) {
|
||||||
ExchangeType: DefaultExchangeType,
|
ExchangeType: DefaultExchangeType,
|
||||||
ExchangeDurability: "durable",
|
ExchangeDurability: "durable",
|
||||||
AuthMethod: DefaultAuthMethod,
|
AuthMethod: DefaultAuthMethod,
|
||||||
Database: DefaultDatabase,
|
Headers: map[string]string{
|
||||||
RetentionPolicy: DefaultRetentionPolicy,
|
"database": DefaultDatabase,
|
||||||
Timeout: config.Duration(time.Second * 5),
|
"retention_policy": DefaultRetentionPolicy,
|
||||||
|
},
|
||||||
|
Timeout: config.Duration(time.Second * 5),
|
||||||
connect: func(_ *ClientConfig) (Client, error) {
|
connect: func(_ *ClientConfig) (Client, error) {
|
||||||
return NewMockClient(), nil
|
return NewMockClient(), nil
|
||||||
},
|
},
|
||||||
|
|
@ -150,6 +152,7 @@ func TestConnect(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
require.NoError(t, tt.output.Init())
|
||||||
err := tt.output.Connect()
|
err := tt.output.Connect()
|
||||||
tt.errFunc(t, tt.output, err)
|
tt.errFunc(t, tt.output, err)
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,5 @@
|
||||||
# Publishes metrics to an AMQP broker
|
# Publishes metrics to an AMQP broker
|
||||||
[[outputs.amqp]]
|
[[outputs.amqp]]
|
||||||
## Broker to publish to.
|
|
||||||
## deprecated in 1.7; use the brokers option
|
|
||||||
# url = "amqp://localhost:5672/influxdb"
|
|
||||||
|
|
||||||
## Brokers to publish to. If multiple brokers are specified a random broker
|
## Brokers to publish to. If multiple brokers are specified a random broker
|
||||||
## will be selected anytime a connection is established. This can be
|
## will be selected anytime a connection is established. This can be
|
||||||
## helpful for load balancing when not using a dedicated load balancer.
|
## helpful for load balancing when not using a dedicated load balancer.
|
||||||
|
|
@ -52,14 +48,6 @@
|
||||||
## One of "transient" or "persistent".
|
## One of "transient" or "persistent".
|
||||||
# delivery_mode = "transient"
|
# delivery_mode = "transient"
|
||||||
|
|
||||||
## InfluxDB database added as a message header.
|
|
||||||
## deprecated in 1.7; use the headers option
|
|
||||||
# database = "telegraf"
|
|
||||||
|
|
||||||
## InfluxDB retention policy added as a message header
|
|
||||||
## deprecated in 1.7; use the headers option
|
|
||||||
# retention_policy = "default"
|
|
||||||
|
|
||||||
## Static headers added to each published message.
|
## Static headers added to each published message.
|
||||||
# headers = { }
|
# headers = { }
|
||||||
# headers = {"database" = "telegraf", "retention_policy" = "default"}
|
# headers = {"database" = "telegraf", "retention_policy" = "default"}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue