fix(outputs.kafka): Simplify send-error handling (#14154)
This commit is contained in:
parent
184038d5e6
commit
fd773b3e28
|
|
@ -223,21 +223,21 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
|||
if err != nil {
|
||||
// We could have many errors, return only the first encountered.
|
||||
var errs sarama.ProducerErrors
|
||||
if errors.As(err, &errs) {
|
||||
for _, prodErr := range errs {
|
||||
if errors.Is(prodErr.Err, sarama.ErrMessageSizeTooLarge) {
|
||||
k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch")
|
||||
return nil
|
||||
}
|
||||
if errors.Is(prodErr.Err, sarama.ErrInvalidTimestamp) {
|
||||
k.Log.Error(
|
||||
"The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; " +
|
||||
"dropping batch",
|
||||
)
|
||||
return nil
|
||||
}
|
||||
return prodErr //nolint:staticcheck // Return first error encountered
|
||||
if errors.As(err, &errs) && len(errs) > 0 {
|
||||
// Just return the first error encountered
|
||||
firstErr := errs[0]
|
||||
if errors.Is(firstErr.Err, sarama.ErrMessageSizeTooLarge) {
|
||||
k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch")
|
||||
return nil
|
||||
}
|
||||
if errors.Is(firstErr.Err, sarama.ErrInvalidTimestamp) {
|
||||
k.Log.Error(
|
||||
"The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; " +
|
||||
"dropping batch",
|
||||
)
|
||||
return nil
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue