feat(inputs.kafka_consumer): Allow to select the metric time source (#15790)
This commit is contained in:
parent
b672578a2a
commit
9e3e22094a
|
|
@ -70,6 +70,13 @@ to use them.
|
||||||
## option, it will be excluded from the msg_headers_as_tags list.
|
## option, it will be excluded from the msg_headers_as_tags list.
|
||||||
# msg_header_as_metric_name = ""
|
# msg_header_as_metric_name = ""
|
||||||
|
|
||||||
|
## Set metric(s) timestamp using the given source.
|
||||||
|
## Available options are:
|
||||||
|
## metric -- do not modify the metric timestamp
|
||||||
|
## inner -- use the inner message timestamp (Kafka v0.10+)
|
||||||
|
## outer -- use the outer (compressed) block timestamp (Kafka v0.10+)
|
||||||
|
# timestamp_source = "metric"
|
||||||
|
|
||||||
## Optional Client id
|
## Optional Client id
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@ type KafkaConsumer struct {
|
||||||
TopicTag string `toml:"topic_tag"`
|
TopicTag string `toml:"topic_tag"`
|
||||||
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
|
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
|
||||||
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
|
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
|
||||||
|
TimestampSource string `toml:"timestamp_source"`
|
||||||
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
||||||
ConnectionStrategy string `toml:"connection_strategy"`
|
ConnectionStrategy string `toml:"connection_strategy"`
|
||||||
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`
|
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`
|
||||||
|
|
@ -108,6 +109,14 @@ func (k *KafkaConsumer) Init() error {
|
||||||
k.ConsumerGroup = defaultConsumerGroup
|
k.ConsumerGroup = defaultConsumerGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch k.TimestampSource {
|
||||||
|
case "":
|
||||||
|
k.TimestampSource = "metric"
|
||||||
|
case "metric", "inner", "outer":
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid timestamp source %q", k.TimestampSource)
|
||||||
|
}
|
||||||
|
|
||||||
cfg := sarama.NewConfig()
|
cfg := sarama.NewConfig()
|
||||||
|
|
||||||
// Kafka version 0.10.2.0 is required for consumer groups.
|
// Kafka version 0.10.2.0 is required for consumer groups.
|
||||||
|
|
@ -334,6 +343,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
handler.MsgHeadersToTags = msgHeadersMap
|
handler.MsgHeadersToTags = msgHeadersMap
|
||||||
|
handler.TimestampSource = k.TimestampSource
|
||||||
|
|
||||||
// We need to copy allWantedTopics; the Consume() is
|
// We need to copy allWantedTopics; the Consume() is
|
||||||
// long-running and we can easily deadlock if our
|
// long-running and we can easily deadlock if our
|
||||||
|
|
@ -399,6 +409,7 @@ type ConsumerGroupHandler struct {
|
||||||
TopicTag string
|
TopicTag string
|
||||||
MsgHeadersToTags map[string]bool
|
MsgHeadersToTags map[string]bool
|
||||||
MsgHeaderToMetricName string
|
MsgHeaderToMetricName string
|
||||||
|
TimestampSource string
|
||||||
|
|
||||||
acc telegraf.TrackingAccumulator
|
acc telegraf.TrackingAccumulator
|
||||||
sem semaphore
|
sem semaphore
|
||||||
|
|
@ -495,12 +506,11 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
headerKey := ""
|
|
||||||
// Check if any message header should override metric name or should be pass as tag
|
// Check if any message header should override metric name or should be pass as tag
|
||||||
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {
|
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {
|
||||||
for _, header := range msg.Headers {
|
for _, header := range msg.Headers {
|
||||||
//convert to a string as the header and value are byte arrays.
|
//convert to a string as the header and value are byte arrays.
|
||||||
headerKey = string(header.Key)
|
headerKey := string(header.Key)
|
||||||
if _, exists := h.MsgHeadersToTags[headerKey]; exists {
|
if _, exists := h.MsgHeadersToTags[headerKey]; exists {
|
||||||
// If message header should be pass as tag then add it to the metrics
|
// If message header should be pass as tag then add it to the metrics
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
|
@ -523,6 +533,18 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Do override the metric timestamp if required
|
||||||
|
switch h.TimestampSource {
|
||||||
|
case "inner":
|
||||||
|
for _, metric := range metrics {
|
||||||
|
metric.SetTime(msg.Timestamp)
|
||||||
|
}
|
||||||
|
case "outer":
|
||||||
|
for _, metric := range metrics {
|
||||||
|
metric.SetTime(msg.BlockTimestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
id := h.acc.AddTrackingMetricGroup(metrics)
|
id := h.acc.AddTrackingMetricGroup(metrics)
|
||||||
h.undelivered[id] = Message{session: session, message: msg}
|
h.undelivered[id] = Message{session: session, message: msg}
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/common/kafka"
|
"github.com/influxdata/telegraf/plugins/common/kafka"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
|
@ -549,6 +550,81 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestKafkaTimestampSourceIntegration(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"test",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{"value": 42},
|
||||||
|
time.Unix(1704067200, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, source := range []string{"metric", "inner", "outer"} {
|
||||||
|
t.Run(source, func(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
kafkaContainer, err := kafkacontainer.Run(ctx, "confluentinc/confluent-local:7.5.0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer kafkaContainer.Terminate(ctx) //nolint:errcheck // ignored
|
||||||
|
|
||||||
|
brokers, err := kafkaContainer.Brokers(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Make kafka output
|
||||||
|
creator := outputs.Outputs["kafka"]
|
||||||
|
output, ok := creator().(*kafkaOutput.Kafka)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
s := &influxSerializer.Serializer{}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
output.SetSerializer(s)
|
||||||
|
output.Brokers = brokers
|
||||||
|
output.Topic = "Test"
|
||||||
|
output.Log = &testutil.Logger{}
|
||||||
|
|
||||||
|
require.NoError(t, output.Init())
|
||||||
|
require.NoError(t, output.Connect())
|
||||||
|
defer output.Close()
|
||||||
|
|
||||||
|
// Make kafka input
|
||||||
|
input := KafkaConsumer{
|
||||||
|
Brokers: brokers,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Topics: []string{"Test"},
|
||||||
|
MaxUndeliveredMessages: 1,
|
||||||
|
}
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
input.SetParser(parser)
|
||||||
|
require.NoError(t, input.Init())
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, input.Start(&acc))
|
||||||
|
defer input.Stop()
|
||||||
|
|
||||||
|
// Send the metrics and check that we got it back
|
||||||
|
sendTimestamp := time.Now().Unix()
|
||||||
|
require.NoError(t, output.Write(metrics))
|
||||||
|
require.Eventually(t, func() bool { return acc.NMetrics() > 0 }, 5*time.Second, 100*time.Millisecond)
|
||||||
|
actual := acc.GetTelegrafMetrics()
|
||||||
|
testutil.RequireMetricsEqual(t, metrics, actual, testutil.IgnoreTime())
|
||||||
|
|
||||||
|
// Check the timestamp
|
||||||
|
m := actual[0]
|
||||||
|
switch source {
|
||||||
|
case "metric":
|
||||||
|
require.EqualValues(t, 1704067200, m.Time().Unix())
|
||||||
|
case "inner", "outer":
|
||||||
|
require.GreaterOrEqual(t, sendTimestamp, m.Time().Unix())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExponentialBackoff(t *testing.T) {
|
func TestExponentialBackoff(t *testing.T) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,13 @@
|
||||||
## option, it will be excluded from the msg_headers_as_tags list.
|
## option, it will be excluded from the msg_headers_as_tags list.
|
||||||
# msg_header_as_metric_name = ""
|
# msg_header_as_metric_name = ""
|
||||||
|
|
||||||
|
## Set metric(s) timestamp using the given source.
|
||||||
|
## Available options are:
|
||||||
|
## metric -- do not modify the metric timestamp
|
||||||
|
## inner -- use the inner message timestamp (Kafka v0.10+)
|
||||||
|
## outer -- use the outer (compressed) block timestamp (Kafka v0.10+)
|
||||||
|
# timestamp_source = "metric"
|
||||||
|
|
||||||
## Optional Client id
|
## Optional Client id
|
||||||
# client_id = "Telegraf"
|
# client_id = "Telegraf"
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue