feat(inputs.kafka_consumer): Mark messages that failed parsing (#14585)

This commit is contained in:
Jeremy Kerfs 2024-01-17 11:45:27 -06:00 committed by GitHub
parent cda93709c1
commit b72be8e97d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 1 additions and 0 deletions

View File

@ -483,6 +483,7 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
metrics, err := h.parser.Parse(msg.Value)
if err != nil {
session.MarkMessage(msg, "")
h.release()
return err
}