2022-05-25 22:48:59 +08:00
|
|
|
//go:generate ../../../tools/readme_config_includer/generator
|
2015-08-27 01:02:10 +08:00
|
|
|
package kafka
|
|
|
|
|
|
|
|
|
|
import (
|
2022-05-25 22:48:59 +08:00
|
|
|
_ "embed"
|
2023-02-22 20:38:16 +08:00
|
|
|
"errors"
|
2015-08-27 01:02:10 +08:00
|
|
|
"fmt"
|
2017-09-07 05:18:26 +08:00
|
|
|
"strings"
|
2020-01-16 07:26:50 +08:00
|
|
|
"time"
|
2016-02-04 03:59:34 +08:00
|
|
|
|
2019-06-04 08:34:48 +08:00
|
|
|
"github.com/Shopify/sarama"
|
2023-02-16 17:50:47 +08:00
|
|
|
"github.com/gofrs/uuid/v5"
|
2021-11-25 03:40:25 +08:00
|
|
|
|
2016-01-28 05:21:36 +08:00
|
|
|
"github.com/influxdata/telegraf"
|
2020-01-03 08:27:26 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/common/kafka"
|
2022-02-16 23:33:03 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/common/proxy"
|
2016-01-28 07:15:14 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
2016-02-11 06:50:07 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/serializers"
|
2015-08-27 01:02:10 +08:00
|
|
|
)
|
|
|
|
|
|
2022-05-25 22:48:59 +08:00
|
|
|
//go:embed sample.conf
|
|
|
|
|
var sampleConfig string
|
|
|
|
|
|
2017-09-07 05:18:26 +08:00
|
|
|
var ValidTopicSuffixMethods = []string{
|
|
|
|
|
"",
|
|
|
|
|
"measurement",
|
|
|
|
|
"tags",
|
2015-08-27 01:02:10 +08:00
|
|
|
}
|
|
|
|
|
|
2020-01-16 07:26:50 +08:00
|
|
|
var zeroTime = time.Unix(0, 0)
|
|
|
|
|
|
2020-10-29 00:16:59 +08:00
|
|
|
type Kafka struct {
|
2020-11-24 04:51:58 +08:00
|
|
|
Brokers []string `toml:"brokers"`
|
|
|
|
|
Topic string `toml:"topic"`
|
|
|
|
|
TopicTag string `toml:"topic_tag"`
|
|
|
|
|
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
|
|
|
|
|
TopicSuffix TopicSuffix `toml:"topic_suffix"`
|
|
|
|
|
RoutingTag string `toml:"routing_tag"`
|
|
|
|
|
RoutingKey string `toml:"routing_key"`
|
2020-10-29 00:16:59 +08:00
|
|
|
|
2022-02-16 23:33:03 +08:00
|
|
|
proxy.Socks5ProxyConfig
|
|
|
|
|
|
2020-10-29 00:16:59 +08:00
|
|
|
// Legacy TLS config options
|
|
|
|
|
// TLS client certificate
|
|
|
|
|
Certificate string
|
|
|
|
|
// TLS client key
|
|
|
|
|
Key string
|
|
|
|
|
// TLS certificate authority
|
|
|
|
|
CA string
|
|
|
|
|
|
2020-11-24 04:51:58 +08:00
|
|
|
kafka.WriteConfig
|
2020-10-29 00:16:59 +08:00
|
|
|
|
2022-12-06 04:54:08 +08:00
|
|
|
kafka.Logger
|
|
|
|
|
|
2020-10-29 00:16:59 +08:00
|
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
|
|
2021-03-26 06:06:03 +08:00
|
|
|
saramaConfig *sarama.Config
|
2020-10-29 00:16:59 +08:00
|
|
|
producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
|
|
|
|
|
producer sarama.SyncProducer
|
|
|
|
|
|
|
|
|
|
serializer serializers.Serializer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type TopicSuffix struct {
|
|
|
|
|
Method string `toml:"method"`
|
|
|
|
|
Keys []string `toml:"keys"`
|
|
|
|
|
Separator string `toml:"separator"`
|
|
|
|
|
}
|
2017-09-07 05:18:26 +08:00
|
|
|
|
|
|
|
|
func ValidateTopicSuffixMethod(method string) error {
|
|
|
|
|
for _, validMethod := range ValidTopicSuffixMethods {
|
|
|
|
|
if method == validMethod {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2021-11-25 03:40:25 +08:00
|
|
|
return fmt.Errorf("unknown topic suffix method provided: %s", method)
|
2017-09-07 05:18:26 +08:00
|
|
|
}
|
|
|
|
|
|
2022-05-25 22:48:59 +08:00
|
|
|
func (*Kafka) SampleConfig() string {
|
|
|
|
|
return sampleConfig
|
|
|
|
|
}
|
|
|
|
|
|
2020-03-11 04:38:26 +08:00
|
|
|
func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) {
|
|
|
|
|
topic := k.Topic
|
|
|
|
|
if k.TopicTag != "" {
|
|
|
|
|
if t, ok := metric.GetTag(k.TopicTag); ok {
|
|
|
|
|
topic = t
|
|
|
|
|
|
|
|
|
|
// If excluding the topic tag, a copy is required to avoid modifying
|
|
|
|
|
// the metric buffer.
|
|
|
|
|
if k.ExcludeTopicTag {
|
|
|
|
|
metric = metric.Copy()
|
|
|
|
|
metric.Accept()
|
|
|
|
|
metric.RemoveTag(k.TopicTag)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-07 05:18:26 +08:00
|
|
|
var topicName string
|
|
|
|
|
switch k.TopicSuffix.Method {
|
|
|
|
|
case "measurement":
|
2020-03-11 04:38:26 +08:00
|
|
|
topicName = topic + k.TopicSuffix.Separator + metric.Name()
|
2017-09-07 05:18:26 +08:00
|
|
|
case "tags":
|
|
|
|
|
var topicNameComponents []string
|
2020-03-11 04:38:26 +08:00
|
|
|
topicNameComponents = append(topicNameComponents, topic)
|
2017-09-07 05:18:26 +08:00
|
|
|
for _, tag := range k.TopicSuffix.Keys {
|
|
|
|
|
tagValue := metric.Tags()[tag]
|
|
|
|
|
if tagValue != "" {
|
|
|
|
|
topicNameComponents = append(topicNameComponents, tagValue)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
|
|
|
|
|
default:
|
2020-03-11 04:38:26 +08:00
|
|
|
topicName = topic
|
2017-09-07 05:18:26 +08:00
|
|
|
}
|
2020-03-11 04:38:26 +08:00
|
|
|
return metric, topicName
|
2017-09-07 05:18:26 +08:00
|
|
|
}
|
|
|
|
|
|
2016-02-11 06:50:07 +08:00
|
|
|
func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
|
|
|
|
|
k.serializer = serializer
|
|
|
|
|
}
|
|
|
|
|
|
2020-11-24 04:51:58 +08:00
|
|
|
func (k *Kafka) Init() error {
|
2022-12-06 04:54:08 +08:00
|
|
|
k.SetLogger()
|
2022-08-03 20:20:51 +08:00
|
|
|
|
2017-09-07 05:18:26 +08:00
|
|
|
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2016-02-04 03:59:34 +08:00
|
|
|
config := sarama.NewConfig()
|
2016-03-31 17:14:20 +08:00
|
|
|
|
2022-11-03 21:01:22 +08:00
|
|
|
if err := k.SetConfig(config, k.Log); err != nil {
|
2020-11-24 04:51:58 +08:00
|
|
|
return err
|
2018-08-14 07:40:18 +08:00
|
|
|
}
|
|
|
|
|
|
2021-03-26 06:06:03 +08:00
|
|
|
k.saramaConfig = config
|
|
|
|
|
|
2016-02-04 03:59:34 +08:00
|
|
|
// Legacy support ssl config
|
|
|
|
|
if k.Certificate != "" {
|
2018-05-05 07:33:23 +08:00
|
|
|
k.TLSCert = k.Certificate
|
|
|
|
|
k.TLSCA = k.CA
|
|
|
|
|
k.TLSKey = k.Key
|
2016-01-11 20:20:51 +08:00
|
|
|
}
|
|
|
|
|
|
2022-02-16 23:33:03 +08:00
|
|
|
if k.Socks5ProxyEnabled {
|
|
|
|
|
config.Net.Proxy.Enable = true
|
|
|
|
|
|
|
|
|
|
dialer, err := k.Socks5ProxyConfig.GetDialer()
|
|
|
|
|
if err != nil {
|
2023-02-22 20:38:16 +08:00
|
|
|
return fmt.Errorf("connecting to proxy server failed: %w", err)
|
2022-02-16 23:33:03 +08:00
|
|
|
}
|
|
|
|
|
config.Net.Proxy.Dialer = dialer
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-27 01:02:10 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-11-24 04:51:58 +08:00
|
|
|
func (k *Kafka) Connect() error {
|
2021-03-26 06:06:03 +08:00
|
|
|
producer, err := k.producerFunc(k.Brokers, k.saramaConfig)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
k.producer = producer
|
2020-11-24 04:51:58 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-27 01:02:10 +08:00
|
|
|
func (k *Kafka) Close() error {
|
|
|
|
|
return k.producer.Close()
|
|
|
|
|
}
|
|
|
|
|
|
2019-11-08 09:39:19 +08:00
|
|
|
func (k *Kafka) routingKey(metric telegraf.Metric) (string, error) {
|
2018-08-22 03:44:10 +08:00
|
|
|
if k.RoutingTag != "" {
|
|
|
|
|
key, ok := metric.GetTag(k.RoutingTag)
|
|
|
|
|
if ok {
|
2019-11-08 09:39:19 +08:00
|
|
|
return key, nil
|
2018-08-22 03:44:10 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if k.RoutingKey == "random" {
|
2019-11-08 09:39:19 +08:00
|
|
|
u, err := uuid.NewV4()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
return u.String(), nil
|
2018-08-22 03:44:10 +08:00
|
|
|
}
|
|
|
|
|
|
2019-11-08 09:39:19 +08:00
|
|
|
return k.RoutingKey, nil
|
2018-08-22 03:44:10 +08:00
|
|
|
}
|
|
|
|
|
|
2016-01-28 07:15:14 +08:00
|
|
|
func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
2018-08-01 06:08:04 +08:00
|
|
|
msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
|
2016-02-11 06:50:07 +08:00
|
|
|
for _, metric := range metrics {
|
2020-03-11 04:38:26 +08:00
|
|
|
metric, topic := k.GetTopicName(metric)
|
|
|
|
|
|
2016-12-06 23:38:59 +08:00
|
|
|
buf, err := k.serializer.Serialize(metric)
|
2016-02-11 06:50:07 +08:00
|
|
|
if err != nil {
|
2019-12-04 03:48:53 +08:00
|
|
|
k.Log.Debugf("Could not serialize metric: %v", err)
|
2019-06-04 08:34:48 +08:00
|
|
|
continue
|
2015-08-27 01:02:10 +08:00
|
|
|
}
|
2016-02-11 06:50:07 +08:00
|
|
|
|
2016-12-06 23:38:59 +08:00
|
|
|
m := &sarama.ProducerMessage{
|
2020-03-11 04:38:26 +08:00
|
|
|
Topic: topic,
|
2020-01-16 07:26:50 +08:00
|
|
|
Value: sarama.ByteEncoder(buf),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Negative timestamps are not allowed by the Kafka protocol.
|
|
|
|
|
if !metric.Time().Before(zeroTime) {
|
|
|
|
|
m.Timestamp = metric.Time()
|
2015-08-27 01:02:10 +08:00
|
|
|
}
|
2019-11-08 09:39:19 +08:00
|
|
|
|
|
|
|
|
key, err := k.routingKey(metric)
|
|
|
|
|
if err != nil {
|
2023-02-22 20:38:16 +08:00
|
|
|
return fmt.Errorf("could not generate routing key: %w", err)
|
2019-11-08 09:39:19 +08:00
|
|
|
}
|
|
|
|
|
|
2018-08-22 03:44:10 +08:00
|
|
|
if key != "" {
|
|
|
|
|
m.Key = sarama.StringEncoder(key)
|
2016-12-06 23:38:59 +08:00
|
|
|
}
|
2018-08-01 06:08:04 +08:00
|
|
|
msgs = append(msgs, m)
|
|
|
|
|
}
|
2016-12-06 23:38:59 +08:00
|
|
|
|
2018-08-01 06:08:04 +08:00
|
|
|
err := k.producer.SendMessages(msgs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// We could have many errors, return only the first encountered.
|
2023-02-22 20:38:16 +08:00
|
|
|
var errs sarama.ProducerErrors
|
|
|
|
|
if errors.As(err, &errs) {
|
2018-08-01 06:08:04 +08:00
|
|
|
for _, prodErr := range errs {
|
2023-02-22 20:38:16 +08:00
|
|
|
if errors.Is(prodErr.Err, sarama.ErrMessageSizeTooLarge) {
|
2019-12-04 03:48:53 +08:00
|
|
|
k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2023-02-22 20:38:16 +08:00
|
|
|
if errors.Is(prodErr.Err, sarama.ErrInvalidTimestamp) {
|
2022-11-08 23:05:55 +08:00
|
|
|
k.Log.Error(
|
|
|
|
|
"The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; " +
|
|
|
|
|
"dropping batch",
|
|
|
|
|
)
|
2018-08-18 04:51:21 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
2021-11-25 03:40:25 +08:00
|
|
|
return prodErr //nolint:staticcheck // Return first error encountered
|
2018-08-01 06:08:04 +08:00
|
|
|
}
|
2015-08-27 01:02:10 +08:00
|
|
|
}
|
2018-08-01 06:08:04 +08:00
|
|
|
return err
|
2015-08-27 01:02:10 +08:00
|
|
|
}
|
2018-08-01 06:08:04 +08:00
|
|
|
|
2015-08-27 01:02:10 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func init() {
|
2016-01-28 05:21:36 +08:00
|
|
|
outputs.Add("kafka", func() telegraf.Output {
|
2016-04-30 08:48:07 +08:00
|
|
|
return &Kafka{
|
2020-11-24 04:51:58 +08:00
|
|
|
WriteConfig: kafka.WriteConfig{
|
|
|
|
|
MaxRetry: 3,
|
|
|
|
|
RequiredAcks: -1,
|
|
|
|
|
},
|
2020-03-11 04:38:26 +08:00
|
|
|
producerFunc: sarama.NewSyncProducer,
|
2016-04-30 08:48:07 +08:00
|
|
|
}
|
2015-08-27 01:02:10 +08:00
|
|
|
})
|
|
|
|
|
}
|