feat(inputs): Add framework to retry on startup errors (#15145)

This commit is contained in:
Sven Rebhan 2024-04-17 16:12:28 -04:00 committed by GitHub
parent 31b2b505c0
commit 8d603cdc9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 567 additions and 84 deletions

View File

@ -339,27 +339,32 @@ func (a *Agent) startInputs(
}
for _, input := range inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not normally subject to timestamp
// rounding except for when precision is set on the input plugin.
//
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision and interval agent/plugin settings.
var interval time.Duration
var precision time.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
// Service input plugins are not normally subject to timestamp
// rounding except for when precision is set on the input plugin.
//
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision and interval agent/plugin settings.
var interval time.Duration
var precision time.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}
acc := NewAccumulator(input, dst)
acc.SetPrecision(getPrecision(precision, interval))
if err := input.Start(acc); err != nil {
// If the model tells us to remove the plugin we do so without error
var fatalErr *internal.FatalError
if errors.As(err, &fatalErr) {
log.Printf("I! [agent] Failed to start %s, shutting down plugin: %s", input.LogName(), err)
continue
}
acc := NewAccumulator(input, dst)
acc.SetPrecision(getPrecision(precision, interval))
stopRunningInputs(unit.inputs)
err := si.Start(acc)
if err != nil {
stopServiceInputs(unit.inputs)
return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err)
}
return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err)
}
unit.inputs = append(unit.inputs, input)
}
@ -424,7 +429,7 @@ func (a *Agent) runInputs(
wg.Wait()
log.Printf("D! [agent] Stopping service inputs")
stopServiceInputs(unit.inputs)
stopRunningInputs(unit.inputs)
close(unit.dst)
log.Printf("D! [agent] Input channel closed")
@ -444,18 +449,15 @@ func (a *Agent) testStartInputs(
}
for _, input := range inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not subject to timestamp rounding.
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)
// Service input plugins are not subject to timestamp rounding.
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)
err := si.Start(acc)
if err != nil {
log.Printf("E! [agent] Starting input %s: %v", input.LogName(), err)
}
if err := input.Start(acc); err != nil {
log.Printf("E! [agent] Starting input %s: %v", input.LogName(), err)
}
unit.inputs = append(unit.inputs, input)
@ -525,18 +527,16 @@ func (a *Agent) testRunInputs(
}
log.Printf("D! [agent] Stopping service inputs")
stopServiceInputs(unit.inputs)
stopRunningInputs(unit.inputs)
close(unit.dst)
log.Printf("D! [agent] Input channel closed")
}
// stopServiceInputs stops all service inputs.
func stopServiceInputs(inputs []*models.RunningInput) {
// stopRunningInputs stops all service inputs.
func stopRunningInputs(inputs []*models.RunningInput) {
for _, input := range inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
si.Stop()
}
input.Stop()
}
}

View File

@ -1432,6 +1432,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
c.getFieldDuration(tbl, "precision", &cp.Precision)
c.getFieldDuration(tbl, "collection_jitter", &cp.CollectionJitter)
c.getFieldDuration(tbl, "collection_offset", &cp.CollectionOffset)
c.getFieldString(tbl, "startup_error_behavior", &cp.StartupErrorBehavior)
c.getFieldString(tbl, "name_prefix", &cp.MeasurementPrefix)
c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &cp.NameOverride)

View File

@ -1,9 +1,12 @@
package models
import (
"errors"
"fmt"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/selfstat"
)
@ -20,9 +23,14 @@ type RunningInput struct {
log telegraf.Logger
defaultTags map[string]string
startAcc telegraf.Accumulator
started bool
retries uint64
MetricsGathered selfstat.Stat
GatherTime selfstat.Stat
GatherTimeouts selfstat.Stat
StartupErrors selfstat.Stat
}
func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
@ -57,19 +65,25 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
"gather_timeouts",
tags,
),
StartupErrors: selfstat.Register(
"write",
"startup_errors",
tags,
),
log: logger,
}
}
// InputConfig is the common config for all inputs.
type InputConfig struct {
Name string
Alias string
ID string
Interval time.Duration
CollectionJitter time.Duration
CollectionOffset time.Duration
Precision time.Duration
Name string
Alias string
ID string
Interval time.Duration
CollectionJitter time.Duration
CollectionOffset time.Duration
Precision time.Duration
StartupErrorBehavior string
NameOverride string
MeasurementPrefix string
@ -89,15 +103,60 @@ func (r *RunningInput) LogName() string {
}
func (r *RunningInput) Init() error {
switch r.Config.StartupErrorBehavior {
case "", "error", "retry", "ignore":
default:
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}
if p, ok := r.Input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
return p.Init()
}
return nil
}
func (r *RunningInput) Start(acc telegraf.Accumulator) error {
plugin, ok := r.Input.(telegraf.ServiceInput)
if !ok {
return nil
}
// Try to start the plugin and exit early on success
r.startAcc = acc
err := plugin.Start(acc)
if err == nil {
r.started = true
return nil
}
r.StartupErrors.Incr(1)
// Check if the plugin reports a retry-able error, otherwise we exit.
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry {
return err
}
// Handle the retry-able error depending on the configured behavior
switch r.Config.StartupErrorBehavior {
case "", "error": // fall-trough to return the actual error
case "retry":
r.log.Infof("Startup failed: %v; retrying...", err)
return nil
case "ignore":
return &internal.FatalError{Err: serr}
default:
r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}
return err
}
func (r *RunningInput) Stop() {
if plugin, ok := r.Input.(telegraf.ServiceInput); ok {
plugin.Stop()
}
}
func (r *RunningInput) ID() string {
if p, ok := r.Input.(telegraf.PluginWithID); ok {
return p.ID()
@ -145,6 +204,22 @@ func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
}
func (r *RunningInput) Gather(acc telegraf.Accumulator) error {
// Try to connect if we are not yet started up
if plugin, ok := r.Input.(telegraf.ServiceInput); ok && !r.started {
r.retries++
if err := plugin.Start(r.startAcc); err != nil {
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry || !serr.Partial {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.log.Debugf("Partially connected after %d attempts", r.retries)
} else {
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
}
start := time.Now()
err := r.Input.Gather(acc)
elapsed := time.Since(start)

View File

@ -34,6 +34,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->
In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:
- `error`: Telegraf with stop and exit in case of startup errors. This is the
default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
the startup succeeds.
## Secret-store support
This plugin supports secrets from secret-stores for the `username` and
@ -105,6 +119,9 @@ to use them.
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Timeout for establishing the connection to a broker
# timeout = "30s"
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html

View File

@ -38,28 +38,19 @@ type AMQPConsumer struct {
ExchangePassive bool `toml:"exchange_passive"`
ExchangeArguments map[string]string `toml:"exchange_arguments"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
// Queue Name
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
QueuePassive bool `toml:"queue_passive"`
QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"`
// Binding Key
BindingKey string `toml:"binding_key"`
// Controls how many messages the server will try to keep on the network
// for consumers before receiving delivery acks.
PrefetchCount int
// AMQP Auth method
AuthMethod string
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
QueuePassive bool `toml:"queue_passive"`
QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"`
BindingKey string `toml:"binding_key"`
PrefetchCount int `toml:"prefetch_count"`
AuthMethod string `toml:"auth_method"`
ContentEncoding string `toml:"content_encoding"`
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig
ContentEncoding string `toml:"content_encoding"`
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
Log telegraf.Logger
deliveries map[telegraf.TrackingID]amqp.Delivery
parser telegraf.Parser
@ -161,6 +152,7 @@ func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
amqpConfig := amqp.Config{
TLSClientConfig: tlsCfg,
SASL: auth, // if nil, it will be PLAIN
Dial: amqp.DefaultDial(time.Duration(a.Timeout)),
}
return &amqpConfig, nil
}
@ -242,7 +234,10 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
}
if a.conn == nil {
return nil, errors.New("could not connect to any broker")
return nil, &internal.StartupError{
Err: errors.New("could not connect to any broker"),
Retry: true,
}
}
ch, err := a.conn.Channel()
@ -493,6 +488,6 @@ func (a *AMQPConsumer) Stop() {
func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{}
return &AMQPConsumer{Timeout: config.Duration(30 * time.Second)}
})
}

View File

@ -1,52 +1,444 @@
package amqp_consumer
import (
"context"
"fmt"
"testing"
"time"
"github.com/docker/go-connections/nat"
"github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
func TestAutoEncoding(t *testing.T) {
// Setup a gzipped payload
enc, err := internal.NewGzipEncoder()
require.NoError(t, err)
payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`))
payloadGZip, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`))
require.NoError(t, err)
var a AMQPConsumer
// Setup the plugin including the message parser
decoder, err := internal.NewContentDecoder("auto")
require.NoError(t, err)
plugin := &AMQPConsumer{
deliveries: make(map[telegraf.TrackingID]amqp091.Delivery),
decoder: decoder,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery)
a.parser = parser
a.decoder, err = internal.NewContentDecoder("auto")
require.NoError(t, err)
plugin.SetParser(parser)
acc := &testutil.Accumulator{}
d := amqp091.Delivery{
// Setup the message creator
msg := amqp091.Delivery{
ContentEncoding: "gzip",
Body: payload,
Body: payloadGZip,
}
err = a.onMessage(acc, d)
require.NoError(t, err)
// Simulate a message receive event
var acc testutil.Accumulator
require.NoError(t, plugin.onMessage(&acc, msg))
acc.AssertContainsFields(t, "measurementName", map[string]interface{}{"fieldKey": "gzip"})
// Check the decoding
encIdentity, err := internal.NewIdentityEncoder()
require.NoError(t, err)
payload, err = encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`))
payload, err := encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`))
require.NoError(t, err)
d = amqp091.Delivery{
// Setup a non-encoded payload
msg = amqp091.Delivery{
ContentEncoding: "not_gzip",
Body: payload,
}
err = a.onMessage(acc, d)
// Simulate a message receive event
require.NoError(t, plugin.onMessage(&acc, msg))
require.NoError(t, err)
acc.AssertContainsFields(t, "measurementName2", map[string]interface{}{"fieldKey": "identity"})
}
func TestIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Define common properties
servicePort := "5672"
vhost := "/"
exchange := "telegraf"
exchangeType := "direct"
queueName := "test"
bindingKey := "test"
// Setup the container
container := testutil.Container{
Image: "rabbitmq",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("Server startup complete"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost)
// Setup a AMQP producer to send messages
client, err := newProducer(url, vhost, exchange, exchangeType, queueName, bindingKey)
require.NoError(t, err)
defer client.close()
// Setup the plugin with an Influx line-protocol parser
plugin := &AMQPConsumer{
Brokers: []string{url},
Username: config.NewSecret([]byte("guest")),
Password: config.NewSecret([]byte("guest")),
Timeout: config.Duration(3 * time.Second),
Exchange: exchange,
ExchangeType: exchangeType,
Queue: queueName,
BindingKey: bindingKey,
Log: testutil.Logger{},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
// Setup the metrics
metrics := []string{
"test,source=A value=0i 1712780301000000000",
"test,source=B value=1i 1712780301000000100",
"test,source=C value=2i 1712780301000000200",
}
expexted := make([]telegraf.Metric, 0, len(metrics))
for _, x := range metrics {
m, err := parser.Parse([]byte(x))
require.NoError(t, err)
expexted = append(expexted, m...)
}
// Start the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Write metrics
for _, x := range metrics {
require.NoError(t, client.write(exchange, queueName, []byte(x)))
}
// Verify that the metrics were actually written
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expexted))
}, 3*time.Second, 100*time.Millisecond)
client.close()
plugin.Stop()
testutil.RequireMetricsEqual(t, expexted, acc.GetTelegrafMetrics())
}
func TestStartupErrorBehaviorError(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Define common properties
servicePort := "5672"
vhost := "/"
exchange := "telegraf"
exchangeType := "direct"
queueName := "test"
bindingKey := "test"
// Setup the container
container := testutil.Container{
Image: "rabbitmq",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("Server startup complete"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost)
// Pause the container for simulating connectivity issues
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
// Setup the plugin with an Influx line-protocol parser
plugin := &AMQPConsumer{
Brokers: []string{url},
Username: config.NewSecret([]byte("guest")),
Password: config.NewSecret([]byte("guest")),
Timeout: config.Duration(1 * time.Second),
Exchange: exchange,
ExchangeType: exchangeType,
Queue: queueName,
BindingKey: bindingKey,
Log: testutil.Logger{},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "amqp",
},
)
require.NoError(t, model.Init())
// Starting the plugin will fail with an error because the container
// is paused.
var acc testutil.Accumulator
require.ErrorContains(t, model.Start(&acc), "could not connect to any broker")
}
func TestStartupErrorBehaviorIgnore(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Define common properties
servicePort := "5672"
vhost := "/"
exchange := "telegraf"
exchangeType := "direct"
queueName := "test"
bindingKey := "test"
// Setup the container
container := testutil.Container{
Image: "rabbitmq",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("Server startup complete"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost)
// Pause the container for simulating connectivity issues
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
// Setup the plugin with an Influx line-protocol parser
plugin := &AMQPConsumer{
Brokers: []string{url},
Username: config.NewSecret([]byte("guest")),
Password: config.NewSecret([]byte("guest")),
Timeout: config.Duration(1 * time.Second),
Exchange: exchange,
ExchangeType: exchangeType,
Queue: queueName,
BindingKey: bindingKey,
Log: testutil.Logger{},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "amqp",
StartupErrorBehavior: "ignore",
},
)
require.NoError(t, model.Init())
// Starting the plugin will fail because the container is paused.
// The model code should convert it to a fatal error for the agent to remove
// the plugin.
var acc testutil.Accumulator
err := model.Start(&acc)
require.ErrorContains(t, err, "could not connect to any broker")
var fatalErr *internal.FatalError
require.ErrorAs(t, err, &fatalErr)
}
func TestStartupErrorBehaviorRetry(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Define common properties
servicePort := "5672"
vhost := "/"
exchange := "telegraf"
exchangeType := "direct"
queueName := "test"
bindingKey := "test"
// Setup the container
container := testutil.Container{
Image: "rabbitmq",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("Server startup complete"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost)
// Pause the container for simulating connectivity issues
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
// Setup the plugin with an Influx line-protocol parser
plugin := &AMQPConsumer{
Brokers: []string{url},
Username: config.NewSecret([]byte("guest")),
Password: config.NewSecret([]byte("guest")),
Timeout: config.Duration(1 * time.Second),
Exchange: exchange,
ExchangeType: exchangeType,
Queue: queueName,
BindingKey: bindingKey,
Log: testutil.Logger{},
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "amqp",
StartupErrorBehavior: "retry",
},
)
require.NoError(t, model.Init())
// Setup the metrics
metrics := []string{
"test,source=A value=0i 1712780301000000000",
"test,source=B value=1i 1712780301000000100",
"test,source=C value=2i 1712780301000000200",
}
expexted := make([]telegraf.Metric, 0, len(metrics))
for _, x := range metrics {
m, err := parser.Parse([]byte(x))
require.NoError(t, err)
expexted = append(expexted, m...)
}
// Starting the plugin should succeed as we will retry to startup later
var acc testutil.Accumulator
require.NoError(t, model.Start(&acc))
// There should be no metrics as the plugin is not fully started up yet
require.Empty(t, acc.GetTelegrafMetrics())
require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected)
require.Equal(t, int64(2), model.StartupErrors.Get())
// Unpause the container, now writes should succeed
require.NoError(t, container.Resume())
require.NoError(t, model.Gather(&acc))
defer model.Stop()
// Setup a AMQP producer and send messages
client, err := newProducer(url, vhost, exchange, exchangeType, queueName, bindingKey)
require.NoError(t, err)
defer client.close()
// Write metrics
for _, x := range metrics {
require.NoError(t, client.write(exchange, queueName, []byte(x)))
}
// Verify that the metrics were actually collected
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expexted))
}, 3*time.Second, 100*time.Millisecond)
client.close()
plugin.Stop()
testutil.RequireMetricsEqual(t, expexted, acc.GetTelegrafMetrics())
}
type producer struct {
conn *amqp091.Connection
channel *amqp091.Channel
queue amqp091.Queue
}
func newProducer(url, vhost, exchange, exchangeType, queueName, key string) (*producer, error) {
cfg := amqp091.Config{
Vhost: vhost,
Properties: amqp091.NewConnectionProperties(),
}
cfg.Properties.SetClientConnectionName("test-producer")
conn, err := amqp091.DialConfig(url, cfg)
if err != nil {
return nil, err
}
channel, err := conn.Channel()
if err != nil {
return nil, err
}
if err := channel.ExchangeDeclare(exchange, exchangeType, true, false, false, false, nil); err != nil {
return nil, err
}
queue, err := channel.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
return nil, err
}
if err := channel.QueueBind(queue.Name, key, exchange, false, nil); err != nil {
return nil, err
}
return &producer{
conn: conn,
channel: channel,
queue: queue,
}, nil
}
func (p *producer) close() {
p.channel.Close()
p.conn.Close()
}
func (p *producer) write(exchange, key string, payload []byte) error {
msg := amqp091.Publishing{
DeliveryMode: amqp091.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: payload,
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
return p.channel.PublishWithContext(ctx, exchange, key, true, false, msg)
}

View File

@ -57,6 +57,9 @@
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Timeout for establishing the connection to a broker
# timeout = "30s"
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html