2022-05-25 22:48:59 +08:00
//go:generate ../../../tools/readme_config_includer/generator
2015-08-27 01:02:10 +08:00
package kafka
import (
2022-05-25 22:48:59 +08:00
_ "embed"
2015-08-27 01:02:10 +08:00
"fmt"
2018-08-18 04:51:21 +08:00
"log"
2017-09-07 05:18:26 +08:00
"strings"
2020-01-16 07:26:50 +08:00
"time"
2016-02-04 03:59:34 +08:00
2019-06-04 08:34:48 +08:00
"github.com/Shopify/sarama"
2019-11-08 09:39:19 +08:00
"github.com/gofrs/uuid"
2021-11-25 03:40:25 +08:00
2016-01-28 05:21:36 +08:00
"github.com/influxdata/telegraf"
2020-01-03 08:27:26 +08:00
"github.com/influxdata/telegraf/plugins/common/kafka"
2022-02-16 23:33:03 +08:00
"github.com/influxdata/telegraf/plugins/common/proxy"
2016-01-28 07:15:14 +08:00
"github.com/influxdata/telegraf/plugins/outputs"
2016-02-11 06:50:07 +08:00
"github.com/influxdata/telegraf/plugins/serializers"
2015-08-27 01:02:10 +08:00
)
2022-05-25 22:48:59 +08:00
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embedd the sampleConfig data.
//go:embed sample.conf
var sampleConfig string
2017-09-07 05:18:26 +08:00
var ValidTopicSuffixMethods = [ ] string {
"" ,
"measurement" ,
"tags" ,
2015-08-27 01:02:10 +08:00
}
2020-01-16 07:26:50 +08:00
var zeroTime = time . Unix ( 0 , 0 )
2020-10-29 00:16:59 +08:00
type Kafka struct {
2020-11-24 04:51:58 +08:00
Brokers [ ] string ` toml:"brokers" `
Topic string ` toml:"topic" `
TopicTag string ` toml:"topic_tag" `
ExcludeTopicTag bool ` toml:"exclude_topic_tag" `
TopicSuffix TopicSuffix ` toml:"topic_suffix" `
RoutingTag string ` toml:"routing_tag" `
RoutingKey string ` toml:"routing_key" `
2020-10-29 00:16:59 +08:00
2022-02-16 23:33:03 +08:00
proxy . Socks5ProxyConfig
2020-10-29 00:16:59 +08:00
// Legacy TLS config options
// TLS client certificate
Certificate string
// TLS client key
Key string
// TLS certificate authority
CA string
2020-11-24 04:51:58 +08:00
kafka . WriteConfig
2020-10-29 00:16:59 +08:00
Log telegraf . Logger ` toml:"-" `
2021-03-26 06:06:03 +08:00
saramaConfig * sarama . Config
2020-10-29 00:16:59 +08:00
producerFunc func ( addrs [ ] string , config * sarama . Config ) ( sarama . SyncProducer , error )
producer sarama . SyncProducer
serializer serializers . Serializer
}
type TopicSuffix struct {
Method string ` toml:"method" `
Keys [ ] string ` toml:"keys" `
Separator string ` toml:"separator" `
}
2017-09-07 05:18:26 +08:00
2019-07-12 04:50:12 +08:00
// DebugLogger logs messages from sarama at the debug level.
type DebugLogger struct {
}
func ( * DebugLogger ) Print ( v ... interface { } ) {
args := make ( [ ] interface { } , 0 , len ( v ) + 1 )
2020-10-08 23:20:35 +08:00
args = append ( append ( args , "D! [sarama] " ) , v ... )
log . Print ( args ... )
2019-07-12 04:50:12 +08:00
}
func ( * DebugLogger ) Printf ( format string , v ... interface { } ) {
log . Printf ( "D! [sarama] " + format , v ... )
}
func ( * DebugLogger ) Println ( v ... interface { } ) {
args := make ( [ ] interface { } , 0 , len ( v ) + 1 )
2020-10-08 23:20:35 +08:00
args = append ( append ( args , "D! [sarama] " ) , v ... )
2019-07-12 04:50:12 +08:00
log . Println ( args ... )
}
2017-09-07 05:18:26 +08:00
func ValidateTopicSuffixMethod ( method string ) error {
for _ , validMethod := range ValidTopicSuffixMethods {
if method == validMethod {
return nil
}
}
2021-11-25 03:40:25 +08:00
return fmt . Errorf ( "unknown topic suffix method provided: %s" , method )
2017-09-07 05:18:26 +08:00
}
2022-05-25 22:48:59 +08:00
func ( * Kafka ) SampleConfig ( ) string {
return sampleConfig
}
2020-03-11 04:38:26 +08:00
func ( k * Kafka ) GetTopicName ( metric telegraf . Metric ) ( telegraf . Metric , string ) {
topic := k . Topic
if k . TopicTag != "" {
if t , ok := metric . GetTag ( k . TopicTag ) ; ok {
topic = t
// If excluding the topic tag, a copy is required to avoid modifying
// the metric buffer.
if k . ExcludeTopicTag {
metric = metric . Copy ( )
metric . Accept ( )
metric . RemoveTag ( k . TopicTag )
}
}
}
2017-09-07 05:18:26 +08:00
var topicName string
switch k . TopicSuffix . Method {
case "measurement" :
2020-03-11 04:38:26 +08:00
topicName = topic + k . TopicSuffix . Separator + metric . Name ( )
2017-09-07 05:18:26 +08:00
case "tags" :
var topicNameComponents [ ] string
2020-03-11 04:38:26 +08:00
topicNameComponents = append ( topicNameComponents , topic )
2017-09-07 05:18:26 +08:00
for _ , tag := range k . TopicSuffix . Keys {
tagValue := metric . Tags ( ) [ tag ]
if tagValue != "" {
topicNameComponents = append ( topicNameComponents , tagValue )
}
}
topicName = strings . Join ( topicNameComponents , k . TopicSuffix . Separator )
default :
2020-03-11 04:38:26 +08:00
topicName = topic
2017-09-07 05:18:26 +08:00
}
2020-03-11 04:38:26 +08:00
return metric , topicName
2017-09-07 05:18:26 +08:00
}
2016-02-11 06:50:07 +08:00
func ( k * Kafka ) SetSerializer ( serializer serializers . Serializer ) {
k . serializer = serializer
}
2020-11-24 04:51:58 +08:00
func ( k * Kafka ) Init ( ) error {
2017-09-07 05:18:26 +08:00
err := ValidateTopicSuffixMethod ( k . TopicSuffix . Method )
if err != nil {
return err
}
2016-02-04 03:59:34 +08:00
config := sarama . NewConfig ( )
2016-03-31 17:14:20 +08:00
2020-11-24 04:51:58 +08:00
if err := k . SetConfig ( config ) ; err != nil {
return err
2018-08-14 07:40:18 +08:00
}
2021-03-26 06:06:03 +08:00
k . saramaConfig = config
2016-02-04 03:59:34 +08:00
// Legacy support ssl config
if k . Certificate != "" {
2018-05-05 07:33:23 +08:00
k . TLSCert = k . Certificate
k . TLSCA = k . CA
k . TLSKey = k . Key
2016-01-11 20:20:51 +08:00
}
2022-02-16 23:33:03 +08:00
if k . Socks5ProxyEnabled {
config . Net . Proxy . Enable = true
dialer , err := k . Socks5ProxyConfig . GetDialer ( )
if err != nil {
return fmt . Errorf ( "connecting to proxy server failed: %s" , err )
}
config . Net . Proxy . Dialer = dialer
}
2015-08-27 01:02:10 +08:00
return nil
}
2020-11-24 04:51:58 +08:00
func ( k * Kafka ) Connect ( ) error {
2021-03-26 06:06:03 +08:00
producer , err := k . producerFunc ( k . Brokers , k . saramaConfig )
if err != nil {
return err
}
k . producer = producer
2020-11-24 04:51:58 +08:00
return nil
}
2015-08-27 01:02:10 +08:00
func ( k * Kafka ) Close ( ) error {
return k . producer . Close ( )
}
2019-11-08 09:39:19 +08:00
func ( k * Kafka ) routingKey ( metric telegraf . Metric ) ( string , error ) {
2018-08-22 03:44:10 +08:00
if k . RoutingTag != "" {
key , ok := metric . GetTag ( k . RoutingTag )
if ok {
2019-11-08 09:39:19 +08:00
return key , nil
2018-08-22 03:44:10 +08:00
}
}
if k . RoutingKey == "random" {
2019-11-08 09:39:19 +08:00
u , err := uuid . NewV4 ( )
if err != nil {
return "" , err
}
return u . String ( ) , nil
2018-08-22 03:44:10 +08:00
}
2019-11-08 09:39:19 +08:00
return k . RoutingKey , nil
2018-08-22 03:44:10 +08:00
}
2016-01-28 07:15:14 +08:00
func ( k * Kafka ) Write ( metrics [ ] telegraf . Metric ) error {
2018-08-01 06:08:04 +08:00
msgs := make ( [ ] * sarama . ProducerMessage , 0 , len ( metrics ) )
2016-02-11 06:50:07 +08:00
for _ , metric := range metrics {
2020-03-11 04:38:26 +08:00
metric , topic := k . GetTopicName ( metric )
2016-12-06 23:38:59 +08:00
buf , err := k . serializer . Serialize ( metric )
2016-02-11 06:50:07 +08:00
if err != nil {
2019-12-04 03:48:53 +08:00
k . Log . Debugf ( "Could not serialize metric: %v" , err )
2019-06-04 08:34:48 +08:00
continue
2015-08-27 01:02:10 +08:00
}
2016-02-11 06:50:07 +08:00
2016-12-06 23:38:59 +08:00
m := & sarama . ProducerMessage {
2020-03-11 04:38:26 +08:00
Topic : topic ,
2020-01-16 07:26:50 +08:00
Value : sarama . ByteEncoder ( buf ) ,
}
// Negative timestamps are not allowed by the Kafka protocol.
if ! metric . Time ( ) . Before ( zeroTime ) {
m . Timestamp = metric . Time ( )
2015-08-27 01:02:10 +08:00
}
2019-11-08 09:39:19 +08:00
key , err := k . routingKey ( metric )
if err != nil {
return fmt . Errorf ( "could not generate routing key: %v" , err )
}
2018-08-22 03:44:10 +08:00
if key != "" {
m . Key = sarama . StringEncoder ( key )
2016-12-06 23:38:59 +08:00
}
2018-08-01 06:08:04 +08:00
msgs = append ( msgs , m )
}
2016-12-06 23:38:59 +08:00
2018-08-01 06:08:04 +08:00
err := k . producer . SendMessages ( msgs )
if err != nil {
// We could have many errors, return only the first encountered.
if errs , ok := err . ( sarama . ProducerErrors ) ; ok {
for _ , prodErr := range errs {
2018-08-18 04:51:21 +08:00
if prodErr . Err == sarama . ErrMessageSizeTooLarge {
2019-12-04 03:48:53 +08:00
k . Log . Error ( "Message too large, consider increasing `max_message_bytes`; dropping batch" )
return nil
}
if prodErr . Err == sarama . ErrInvalidTimestamp {
k . Log . Error ( "The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch" )
2018-08-18 04:51:21 +08:00
return nil
}
2021-11-25 03:40:25 +08:00
return prodErr //nolint:staticcheck // Return first error encountered
2018-08-01 06:08:04 +08:00
}
2015-08-27 01:02:10 +08:00
}
2018-08-01 06:08:04 +08:00
return err
2015-08-27 01:02:10 +08:00
}
2018-08-01 06:08:04 +08:00
2015-08-27 01:02:10 +08:00
return nil
}
func init ( ) {
2019-07-12 04:50:12 +08:00
sarama . Logger = & DebugLogger { }
2016-01-28 05:21:36 +08:00
outputs . Add ( "kafka" , func ( ) telegraf . Output {
2016-04-30 08:48:07 +08:00
return & Kafka {
2020-11-24 04:51:58 +08:00
WriteConfig : kafka . WriteConfig {
MaxRetry : 3 ,
RequiredAcks : - 1 ,
} ,
2020-03-11 04:38:26 +08:00
producerFunc : sarama . NewSyncProducer ,
2016-04-30 08:48:07 +08:00
}
2015-08-27 01:02:10 +08:00
} )
}