diff --git a/CHANGELOG.md b/CHANGELOG.md index e8db62397..416dff1f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ # Changelog +## Unreleased + +### Important Changes + +- PR [#15796](https://github.com/influxdata/telegraf/pull/15796) changes the + delivery state update of un-parseable messages from `ACK` to `NACK` without + requeueing. This way, those messages are not lost and can optionally be + handled using a dead-letter exchange by other means. + ## v1.32.0 [2024-09-09] ### Important Changes diff --git a/plugins/inputs/amqp_consumer/README.md b/plugins/inputs/amqp_consumer/README.md index b8f582e84..3127c1da5 100644 --- a/plugins/inputs/amqp_consumer/README.md +++ b/plugins/inputs/amqp_consumer/README.md @@ -6,8 +6,8 @@ implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/). Metrics are read from a topic exchange using the configured queue and binding_key. -Message payload should be formatted in one of the [Telegraf Data -Formats](../../../docs/DATA_FORMATS_INPUT.md). +Message payload should be formatted in one of the +[Telegraf Data Formats](../../../docs/DATA_FORMATS_INPUT.md). For an introduction to AMQP see: @@ -153,10 +153,29 @@ to use them. data_format = "influx" ``` +## Message acknowledgement behavior + +This plugin tracks metrics to report the delivery state to the broker. + +Messages are **acknowledged** (ACK) in the broker if they were successfully +parsed and delivered to all corresponding output sinks. + +Messages are **not acknowledged** (NACK) if parsing of the messages fails and no +metrics were created. In this case requeueing is disabled so messages will not +be sent out to any other queue. The message will then be discarded or sent to a +dead-letter exchange depending on the server configuration. See +[RabitMQ documentation][rabbitmq_doc] for more details. + +Messages are **rejected** (REJECT) if the messages were parsed correctly but +could not be delivered e.g. due to output-service outages. Requeueing is +disabled in this case and messages will be discarded by the server. See +[RabitMQ documentation][rabbitmq_doc] for more details. + +[rabbitmq_doc]: https://www.rabbitmq.com/docs/confirms + ## Metrics -TODO +The format of metrics produced by this plugin depends on the content and +data format of received messages. ## Example Output - -TODO diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index f076261a3..f1e2fa8d0 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -422,11 +422,9 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error { onError := func() { - // Discard the message from the queue; will never be able to process - // this message. - rejErr := d.Ack(false) - if rejErr != nil { - a.Log.Errorf("Unable to reject message: %d: %v", d.DeliveryTag, rejErr) + // Discard the message from the queue; will never be able to process it + if err := d.Nack(false, false); err != nil { + a.Log.Errorf("Unable to NACK message: %d: %v", d.DeliveryTag, err) a.conn.Close() } }