From 8d603cdc9ca7958d1d028dbbc0efd5b305855aba Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 17 Apr 2024 16:12:28 -0400 Subject: [PATCH] feat(inputs): Add framework to retry on startup errors (#15145) --- agent/agent.go | 72 +-- config/config.go | 1 + models/running_input.go | 97 +++- plugins/inputs/amqp_consumer/README.md | 17 + plugins/inputs/amqp_consumer/amqp_consumer.go | 39 +- .../amqp_consumer/amqp_consumer_test.go | 422 +++++++++++++++++- plugins/inputs/amqp_consumer/sample.conf | 3 + 7 files changed, 567 insertions(+), 84 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a60b813d4..526b15b10 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -339,27 +339,32 @@ func (a *Agent) startInputs( } for _, input := range inputs { - if si, ok := input.Input.(telegraf.ServiceInput); ok { - // Service input plugins are not normally subject to timestamp - // rounding except for when precision is set on the input plugin. - // - // This only applies to the accumulator passed to Start(), the - // Gather() accumulator does apply rounding according to the - // precision and interval agent/plugin settings. - var interval time.Duration - var precision time.Duration - if input.Config.Precision != 0 { - precision = input.Config.Precision + // Service input plugins are not normally subject to timestamp + // rounding except for when precision is set on the input plugin. + // + // This only applies to the accumulator passed to Start(), the + // Gather() accumulator does apply rounding according to the + // precision and interval agent/plugin settings. + var interval time.Duration + var precision time.Duration + if input.Config.Precision != 0 { + precision = input.Config.Precision + } + + acc := NewAccumulator(input, dst) + acc.SetPrecision(getPrecision(precision, interval)) + + if err := input.Start(acc); err != nil { + // If the model tells us to remove the plugin we do so without error + var fatalErr *internal.FatalError + if errors.As(err, &fatalErr) { + log.Printf("I! [agent] Failed to start %s, shutting down plugin: %s", input.LogName(), err) + continue } - acc := NewAccumulator(input, dst) - acc.SetPrecision(getPrecision(precision, interval)) + stopRunningInputs(unit.inputs) - err := si.Start(acc) - if err != nil { - stopServiceInputs(unit.inputs) - return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err) - } + return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err) } unit.inputs = append(unit.inputs, input) } @@ -424,7 +429,7 @@ func (a *Agent) runInputs( wg.Wait() log.Printf("D! [agent] Stopping service inputs") - stopServiceInputs(unit.inputs) + stopRunningInputs(unit.inputs) close(unit.dst) log.Printf("D! [agent] Input channel closed") @@ -444,18 +449,15 @@ func (a *Agent) testStartInputs( } for _, input := range inputs { - if si, ok := input.Input.(telegraf.ServiceInput); ok { - // Service input plugins are not subject to timestamp rounding. - // This only applies to the accumulator passed to Start(), the - // Gather() accumulator does apply rounding according to the - // precision agent setting. - acc := NewAccumulator(input, dst) - acc.SetPrecision(time.Nanosecond) + // Service input plugins are not subject to timestamp rounding. + // This only applies to the accumulator passed to Start(), the + // Gather() accumulator does apply rounding according to the + // precision agent setting. + acc := NewAccumulator(input, dst) + acc.SetPrecision(time.Nanosecond) - err := si.Start(acc) - if err != nil { - log.Printf("E! [agent] Starting input %s: %v", input.LogName(), err) - } + if err := input.Start(acc); err != nil { + log.Printf("E! [agent] Starting input %s: %v", input.LogName(), err) } unit.inputs = append(unit.inputs, input) @@ -525,18 +527,16 @@ func (a *Agent) testRunInputs( } log.Printf("D! [agent] Stopping service inputs") - stopServiceInputs(unit.inputs) + stopRunningInputs(unit.inputs) close(unit.dst) log.Printf("D! [agent] Input channel closed") } -// stopServiceInputs stops all service inputs. -func stopServiceInputs(inputs []*models.RunningInput) { +// stopRunningInputs stops all service inputs. +func stopRunningInputs(inputs []*models.RunningInput) { for _, input := range inputs { - if si, ok := input.Input.(telegraf.ServiceInput); ok { - si.Stop() - } + input.Stop() } } diff --git a/config/config.go b/config/config.go index 2a255a1ea..da24c1ff8 100644 --- a/config/config.go +++ b/config/config.go @@ -1432,6 +1432,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e c.getFieldDuration(tbl, "precision", &cp.Precision) c.getFieldDuration(tbl, "collection_jitter", &cp.CollectionJitter) c.getFieldDuration(tbl, "collection_offset", &cp.CollectionOffset) + c.getFieldString(tbl, "startup_error_behavior", &cp.StartupErrorBehavior) c.getFieldString(tbl, "name_prefix", &cp.MeasurementPrefix) c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix) c.getFieldString(tbl, "name_override", &cp.NameOverride) diff --git a/models/running_input.go b/models/running_input.go index 527a22363..0b2197f05 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -1,9 +1,12 @@ package models import ( + "errors" + "fmt" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/selfstat" ) @@ -20,9 +23,14 @@ type RunningInput struct { log telegraf.Logger defaultTags map[string]string + startAcc telegraf.Accumulator + started bool + retries uint64 + MetricsGathered selfstat.Stat GatherTime selfstat.Stat GatherTimeouts selfstat.Stat + StartupErrors selfstat.Stat } func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { @@ -57,19 +65,25 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { "gather_timeouts", tags, ), + StartupErrors: selfstat.Register( + "write", + "startup_errors", + tags, + ), log: logger, } } // InputConfig is the common config for all inputs. type InputConfig struct { - Name string - Alias string - ID string - Interval time.Duration - CollectionJitter time.Duration - CollectionOffset time.Duration - Precision time.Duration + Name string + Alias string + ID string + Interval time.Duration + CollectionJitter time.Duration + CollectionOffset time.Duration + Precision time.Duration + StartupErrorBehavior string NameOverride string MeasurementPrefix string @@ -89,15 +103,60 @@ func (r *RunningInput) LogName() string { } func (r *RunningInput) Init() error { + switch r.Config.StartupErrorBehavior { + case "", "error", "retry", "ignore": + default: + return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) + } + if p, ok := r.Input.(telegraf.Initializer); ok { - err := p.Init() - if err != nil { - return err - } + return p.Init() } return nil } +func (r *RunningInput) Start(acc telegraf.Accumulator) error { + plugin, ok := r.Input.(telegraf.ServiceInput) + if !ok { + return nil + } + + // Try to start the plugin and exit early on success + r.startAcc = acc + err := plugin.Start(acc) + if err == nil { + r.started = true + return nil + } + r.StartupErrors.Incr(1) + + // Check if the plugin reports a retry-able error, otherwise we exit. + var serr *internal.StartupError + if !errors.As(err, &serr) || !serr.Retry { + return err + } + + // Handle the retry-able error depending on the configured behavior + switch r.Config.StartupErrorBehavior { + case "", "error": // fall-trough to return the actual error + case "retry": + r.log.Infof("Startup failed: %v; retrying...", err) + return nil + case "ignore": + return &internal.FatalError{Err: serr} + default: + r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) + } + + return err +} + +func (r *RunningInput) Stop() { + if plugin, ok := r.Input.(telegraf.ServiceInput); ok { + plugin.Stop() + } +} + func (r *RunningInput) ID() string { if p, ok := r.Input.(telegraf.PluginWithID); ok { return p.ID() @@ -145,6 +204,22 @@ func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric { } func (r *RunningInput) Gather(acc telegraf.Accumulator) error { + // Try to connect if we are not yet started up + if plugin, ok := r.Input.(telegraf.ServiceInput); ok && !r.started { + r.retries++ + if err := plugin.Start(r.startAcc); err != nil { + var serr *internal.StartupError + if !errors.As(err, &serr) || !serr.Retry || !serr.Partial { + r.StartupErrors.Incr(1) + return internal.ErrNotConnected + } + r.log.Debugf("Partially connected after %d attempts", r.retries) + } else { + r.started = true + r.log.Debugf("Successfully connected after %d attempts", r.retries) + } + } + start := time.Now() err := r.Input.Gather(acc) elapsed := time.Since(start) diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index d82e9bc88..b8f582e84 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -34,6 +34,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. [CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins +## Startup error behavior options + +In addition to the plugin-specific and global configuration settings the plugin +supports options for specifying the behavior when experiencing startup errors +using the `startup_error_behavior` setting. Available values are: + +- `error`: Telegraf with stop and exit in case of startup errors. This is the + default behavior. +- `ignore`: Telegraf will ignore startup errors for this plugin and disables it + but continues processing for all other plugins. +- `retry`: Telegraf will try to startup the plugin in every gather or write + cycle in case of startup errors. The plugin is disabled until + the startup succeeds. + ## Secret-store support This plugin supports secrets from secret-stores for the `username` and @@ -105,6 +119,9 @@ to use them. ## setting it too low may never flush the broker's messages. # max_undelivered_messages = 1000 + ## Timeout for establishing the connection to a broker + # timeout = "30s" + ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index c0661eca3..866b860dc 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -38,28 +38,19 @@ type AMQPConsumer struct { ExchangePassive bool `toml:"exchange_passive"` ExchangeArguments map[string]string `toml:"exchange_arguments"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` - - // Queue Name - Queue string `toml:"queue"` - QueueDurability string `toml:"queue_durability"` - QueuePassive bool `toml:"queue_passive"` - QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"` - - // Binding Key - BindingKey string `toml:"binding_key"` - - // Controls how many messages the server will try to keep on the network - // for consumers before receiving delivery acks. - PrefetchCount int - - // AMQP Auth method - AuthMethod string + Queue string `toml:"queue"` + QueueDurability string `toml:"queue_durability"` + QueuePassive bool `toml:"queue_passive"` + QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"` + BindingKey string `toml:"binding_key"` + PrefetchCount int `toml:"prefetch_count"` + AuthMethod string `toml:"auth_method"` + ContentEncoding string `toml:"content_encoding"` + MaxDecompressionSize config.Size `toml:"max_decompression_size"` + Timeout config.Duration `toml:"timeout"` + Log telegraf.Logger `toml:"-"` tls.ClientConfig - ContentEncoding string `toml:"content_encoding"` - MaxDecompressionSize config.Size `toml:"max_decompression_size"` - Log telegraf.Logger - deliveries map[telegraf.TrackingID]amqp.Delivery parser telegraf.Parser @@ -161,6 +152,7 @@ func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { amqpConfig := amqp.Config{ TLSClientConfig: tlsCfg, SASL: auth, // if nil, it will be PLAIN + Dial: amqp.DefaultDial(time.Duration(a.Timeout)), } return &amqpConfig, nil } @@ -242,7 +234,10 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err } if a.conn == nil { - return nil, errors.New("could not connect to any broker") + return nil, &internal.StartupError{ + Err: errors.New("could not connect to any broker"), + Retry: true, + } } ch, err := a.conn.Channel() @@ -493,6 +488,6 @@ func (a *AMQPConsumer) Stop() { func init() { inputs.Add("amqp_consumer", func() telegraf.Input { - return &AMQPConsumer{} + return &AMQPConsumer{Timeout: config.Duration(30 * time.Second)} }) } diff --git a/plugins/inputs/amqp_consumer/amqp_consumer_test.go b/plugins/inputs/amqp_consumer/amqp_consumer_test.go index f8b73b4b1..957addaab 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer_test.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer_test.go @@ -1,52 +1,444 @@ package amqp_consumer import ( + "context" + "fmt" "testing" + "time" + "github.com/docker/go-connections/nat" "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) func TestAutoEncoding(t *testing.T) { + // Setup a gzipped payload enc, err := internal.NewGzipEncoder() require.NoError(t, err) - payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`)) + payloadGZip, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`)) require.NoError(t, err) - var a AMQPConsumer + // Setup the plugin including the message parser + decoder, err := internal.NewContentDecoder("auto") + require.NoError(t, err) + plugin := &AMQPConsumer{ + deliveries: make(map[telegraf.TrackingID]amqp091.Delivery), + decoder: decoder, + } + parser := &influx.Parser{} require.NoError(t, parser.Init()) - a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery) - a.parser = parser - a.decoder, err = internal.NewContentDecoder("auto") - require.NoError(t, err) + plugin.SetParser(parser) - acc := &testutil.Accumulator{} - - d := amqp091.Delivery{ + // Setup the message creator + msg := amqp091.Delivery{ ContentEncoding: "gzip", - Body: payload, + Body: payloadGZip, } - err = a.onMessage(acc, d) - require.NoError(t, err) + + // Simulate a message receive event + var acc testutil.Accumulator + require.NoError(t, plugin.onMessage(&acc, msg)) acc.AssertContainsFields(t, "measurementName", map[string]interface{}{"fieldKey": "gzip"}) + // Check the decoding encIdentity, err := internal.NewIdentityEncoder() require.NoError(t, err) - payload, err = encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`)) + payload, err := encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`)) require.NoError(t, err) - d = amqp091.Delivery{ + // Setup a non-encoded payload + msg = amqp091.Delivery{ ContentEncoding: "not_gzip", Body: payload, } - err = a.onMessage(acc, d) + // Simulate a message receive event + require.NoError(t, plugin.onMessage(&acc, msg)) require.NoError(t, err) acc.AssertContainsFields(t, "measurementName2", map[string]interface{}{"fieldKey": "identity"}) } + +func TestIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Define common properties + servicePort := "5672" + vhost := "/" + exchange := "telegraf" + exchangeType := "direct" + queueName := "test" + bindingKey := "test" + + // Setup the container + container := testutil.Container{ + Image: "rabbitmq", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Server startup complete"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost) + + // Setup a AMQP producer to send messages + client, err := newProducer(url, vhost, exchange, exchangeType, queueName, bindingKey) + require.NoError(t, err) + defer client.close() + + // Setup the plugin with an Influx line-protocol parser + plugin := &AMQPConsumer{ + Brokers: []string{url}, + Username: config.NewSecret([]byte("guest")), + Password: config.NewSecret([]byte("guest")), + Timeout: config.Duration(3 * time.Second), + Exchange: exchange, + ExchangeType: exchangeType, + Queue: queueName, + BindingKey: bindingKey, + Log: testutil.Logger{}, + } + + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + + // Setup the metrics + metrics := []string{ + "test,source=A value=0i 1712780301000000000", + "test,source=B value=1i 1712780301000000100", + "test,source=C value=2i 1712780301000000200", + } + expexted := make([]telegraf.Metric, 0, len(metrics)) + for _, x := range metrics { + m, err := parser.Parse([]byte(x)) + require.NoError(t, err) + expexted = append(expexted, m...) + } + + // Start the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Write metrics + for _, x := range metrics { + require.NoError(t, client.write(exchange, queueName, []byte(x))) + } + + // Verify that the metrics were actually written + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expexted)) + }, 3*time.Second, 100*time.Millisecond) + + client.close() + plugin.Stop() + testutil.RequireMetricsEqual(t, expexted, acc.GetTelegrafMetrics()) +} + +func TestStartupErrorBehaviorError(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Define common properties + servicePort := "5672" + vhost := "/" + exchange := "telegraf" + exchangeType := "direct" + queueName := "test" + bindingKey := "test" + + // Setup the container + container := testutil.Container{ + Image: "rabbitmq", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Server startup complete"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost) + + // Pause the container for simulating connectivity issues + require.NoError(t, container.Pause()) + defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway + + // Setup the plugin with an Influx line-protocol parser + plugin := &AMQPConsumer{ + Brokers: []string{url}, + Username: config.NewSecret([]byte("guest")), + Password: config.NewSecret([]byte("guest")), + Timeout: config.Duration(1 * time.Second), + Exchange: exchange, + ExchangeType: exchangeType, + Queue: queueName, + BindingKey: bindingKey, + Log: testutil.Logger{}, + } + + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a model to be able to use the startup retry strategy + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "amqp", + }, + ) + require.NoError(t, model.Init()) + + // Starting the plugin will fail with an error because the container + // is paused. + var acc testutil.Accumulator + require.ErrorContains(t, model.Start(&acc), "could not connect to any broker") +} + +func TestStartupErrorBehaviorIgnore(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Define common properties + servicePort := "5672" + vhost := "/" + exchange := "telegraf" + exchangeType := "direct" + queueName := "test" + bindingKey := "test" + + // Setup the container + container := testutil.Container{ + Image: "rabbitmq", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Server startup complete"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost) + + // Pause the container for simulating connectivity issues + require.NoError(t, container.Pause()) + defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway + + // Setup the plugin with an Influx line-protocol parser + plugin := &AMQPConsumer{ + Brokers: []string{url}, + Username: config.NewSecret([]byte("guest")), + Password: config.NewSecret([]byte("guest")), + Timeout: config.Duration(1 * time.Second), + Exchange: exchange, + ExchangeType: exchangeType, + Queue: queueName, + BindingKey: bindingKey, + Log: testutil.Logger{}, + } + + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a model to be able to use the startup retry strategy + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "amqp", + StartupErrorBehavior: "ignore", + }, + ) + require.NoError(t, model.Init()) + + // Starting the plugin will fail because the container is paused. + // The model code should convert it to a fatal error for the agent to remove + // the plugin. + var acc testutil.Accumulator + err := model.Start(&acc) + require.ErrorContains(t, err, "could not connect to any broker") + var fatalErr *internal.FatalError + require.ErrorAs(t, err, &fatalErr) +} + +func TestStartupErrorBehaviorRetry(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Define common properties + servicePort := "5672" + vhost := "/" + exchange := "telegraf" + exchangeType := "direct" + queueName := "test" + bindingKey := "test" + + // Setup the container + container := testutil.Container{ + Image: "rabbitmq", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Server startup complete"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + url := fmt.Sprintf("amqp://%s:%s%s", container.Address, container.Ports[servicePort], vhost) + + // Pause the container for simulating connectivity issues + require.NoError(t, container.Pause()) + defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway + + // Setup the plugin with an Influx line-protocol parser + plugin := &AMQPConsumer{ + Brokers: []string{url}, + Username: config.NewSecret([]byte("guest")), + Password: config.NewSecret([]byte("guest")), + Timeout: config.Duration(1 * time.Second), + Exchange: exchange, + ExchangeType: exchangeType, + Queue: queueName, + BindingKey: bindingKey, + Log: testutil.Logger{}, + } + + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a model to be able to use the startup retry strategy + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "amqp", + StartupErrorBehavior: "retry", + }, + ) + require.NoError(t, model.Init()) + + // Setup the metrics + metrics := []string{ + "test,source=A value=0i 1712780301000000000", + "test,source=B value=1i 1712780301000000100", + "test,source=C value=2i 1712780301000000200", + } + expexted := make([]telegraf.Metric, 0, len(metrics)) + for _, x := range metrics { + m, err := parser.Parse([]byte(x)) + require.NoError(t, err) + expexted = append(expexted, m...) + } + + // Starting the plugin should succeed as we will retry to startup later + var acc testutil.Accumulator + require.NoError(t, model.Start(&acc)) + + // There should be no metrics as the plugin is not fully started up yet + require.Empty(t, acc.GetTelegrafMetrics()) + require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected) + require.Equal(t, int64(2), model.StartupErrors.Get()) + + // Unpause the container, now writes should succeed + require.NoError(t, container.Resume()) + require.NoError(t, model.Gather(&acc)) + defer model.Stop() + + // Setup a AMQP producer and send messages + client, err := newProducer(url, vhost, exchange, exchangeType, queueName, bindingKey) + require.NoError(t, err) + defer client.close() + + // Write metrics + for _, x := range metrics { + require.NoError(t, client.write(exchange, queueName, []byte(x))) + } + + // Verify that the metrics were actually collected + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expexted)) + }, 3*time.Second, 100*time.Millisecond) + + client.close() + plugin.Stop() + testutil.RequireMetricsEqual(t, expexted, acc.GetTelegrafMetrics()) +} + +type producer struct { + conn *amqp091.Connection + channel *amqp091.Channel + queue amqp091.Queue +} + +func newProducer(url, vhost, exchange, exchangeType, queueName, key string) (*producer, error) { + cfg := amqp091.Config{ + Vhost: vhost, + Properties: amqp091.NewConnectionProperties(), + } + cfg.Properties.SetClientConnectionName("test-producer") + conn, err := amqp091.DialConfig(url, cfg) + if err != nil { + return nil, err + } + + channel, err := conn.Channel() + if err != nil { + return nil, err + } + + if err := channel.ExchangeDeclare(exchange, exchangeType, true, false, false, false, nil); err != nil { + return nil, err + } + + queue, err := channel.QueueDeclare(queueName, true, false, false, false, nil) + if err != nil { + return nil, err + } + + if err := channel.QueueBind(queue.Name, key, exchange, false, nil); err != nil { + return nil, err + } + + return &producer{ + conn: conn, + channel: channel, + queue: queue, + }, nil +} + +func (p *producer) close() { + p.channel.Close() + p.conn.Close() +} + +func (p *producer) write(exchange, key string, payload []byte) error { + msg := amqp091.Publishing{ + DeliveryMode: amqp091.Persistent, + Timestamp: time.Now(), + ContentType: "text/plain", + Body: payload, + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + return p.channel.PublishWithContext(ctx, exchange, key, true, false, msg) +} diff --git a/plugins/inputs/amqp_consumer/sample.conf b/plugins/inputs/amqp_consumer/sample.conf index 4af6b8824..035c109fe 100644 --- a/plugins/inputs/amqp_consumer/sample.conf +++ b/plugins/inputs/amqp_consumer/sample.conf @@ -57,6 +57,9 @@ ## setting it too low may never flush the broker's messages. # max_undelivered_messages = 1000 + ## Timeout for establishing the connection to a broker + # timeout = "30s" + ## Auth method. PLAIN and EXTERNAL are supported ## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as ## described here: https://www.rabbitmq.com/plugins.html