fix(inputs.amqp_consumer): NACKing messages on non-delivery related errors (#15796)

This commit is contained in:
Sven Rebhan 2024-09-19 18:35:11 +02:00 committed by GitHub
parent 024b2e22db
commit 640eda0ca6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 36 additions and 10 deletions

View File

@ -1,6 +1,15 @@
<!-- markdownlint-disable MD024 -->
# Changelog
## Unreleased
### Important Changes
- PR [#15796](https://github.com/influxdata/telegraf/pull/15796) changes the
delivery state update of un-parseable messages from `ACK` to `NACK` without
requeueing. This way, those messages are not lost and can optionally be
handled using a dead-letter exchange by other means.
## v1.32.0 [2024-09-09]
### Important Changes

View File

@ -6,8 +6,8 @@ implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
Metrics are read from a topic exchange using the configured queue and
binding_key.
Message payload should be formatted in one of the [Telegraf Data
Formats](../../../docs/DATA_FORMATS_INPUT.md).
Message payload should be formatted in one of the
[Telegraf Data Formats](../../../docs/DATA_FORMATS_INPUT.md).
For an introduction to AMQP see:
@ -153,10 +153,29 @@ to use them.
data_format = "influx"
```
## Message acknowledgement behavior
This plugin tracks metrics to report the delivery state to the broker.
Messages are **acknowledged** (ACK) in the broker if they were successfully
parsed and delivered to all corresponding output sinks.
Messages are **not acknowledged** (NACK) if parsing of the messages fails and no
metrics were created. In this case requeueing is disabled so messages will not
be sent out to any other queue. The message will then be discarded or sent to a
dead-letter exchange depending on the server configuration. See
[RabitMQ documentation][rabbitmq_doc] for more details.
Messages are **rejected** (REJECT) if the messages were parsed correctly but
could not be delivered e.g. due to output-service outages. Requeueing is
disabled in this case and messages will be discarded by the server. See
[RabitMQ documentation][rabbitmq_doc] for more details.
[rabbitmq_doc]: https://www.rabbitmq.com/docs/confirms
## Metrics
TODO
The format of metrics produced by this plugin depends on the content and
data format of received messages.
## Example Output
TODO

View File

@ -422,11 +422,9 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a
func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
onError := func() {
// Discard the message from the queue; will never be able to process
// this message.
rejErr := d.Ack(false)
if rejErr != nil {
a.Log.Errorf("Unable to reject message: %d: %v", d.DeliveryTag, rejErr)
// Discard the message from the queue; will never be able to process it
if err := d.Nack(false, false); err != nil {
a.Log.Errorf("Unable to NACK message: %d: %v", d.DeliveryTag, err)
a.conn.Close()
}
}