chore: Fix linter findings for `revive:exported` in `plugins/inputs/k*` (#16091)

This commit is contained in:
Paweł Żak 2024-10-31 22:20:00 +01:00 committed by GitHub
parent dcec9d1cea
commit 7c0fe8a3e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 455 additions and 452 deletions

View File

@ -17,11 +17,13 @@ type Bcache struct {
Log telegraf.Logger `toml:"-"`
}
func (*Bcache) SampleConfig() string { return sampleConfig }
func (b *Bcache) Init() error {
b.Log.Warn("current platform is not supported")
b.Log.Warn("Current platform is not supported")
return nil
}
func (*Bcache) SampleConfig() string { return sampleConfig }
func (*Bcache) Gather(_ telegraf.Accumulator) error { return nil }
func init() {

View File

@ -20,7 +20,7 @@ type Conntrack struct {
func (*Conntrack) SampleConfig() string { return sampleConfig }
func (c *Conntrack) Init() error {
c.Log.Warn("current platform is not supported")
c.Log.Warn("Current platform is not supported")
return nil
}

View File

@ -17,11 +17,13 @@ type Dpdk struct {
Log telegraf.Logger `toml:"-"`
}
func (*Dpdk) SampleConfig() string { return sampleConfig }
func (d *Dpdk) Init() error {
d.Log.Warn("current platform is not supported")
d.Log.Warn("Current platform is not supported")
return nil
}
func (*Dpdk) SampleConfig() string { return sampleConfig }
func (*Dpdk) Gather(_ telegraf.Accumulator) error { return nil }
func init() {

View File

@ -22,7 +22,7 @@ func (*Hugepages) SampleConfig() string {
}
func (h *Hugepages) Init() error {
h.Log.Warn("current platform is not supported")
h.Log.Warn("Current platform is not supported")
return nil
}

View File

@ -33,9 +33,6 @@ const (
reconnectDelay = 5 * time.Second
)
type empty struct{}
type semaphore chan empty
type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
Version string `toml:"kafka_version"`
@ -57,8 +54,8 @@ type KafkaConsumer struct {
Log telegraf.Logger `toml:"-"`
kafka.ReadConfig
ConsumerCreator ConsumerGroupCreator `toml:"-"`
consumer ConsumerGroup
consumerCreator consumerGroupCreator
consumer consumerGroup
config *sarama.Config
topicClient sarama.Client
@ -72,19 +69,50 @@ type KafkaConsumer struct {
cancel context.CancelFunc
}
type ConsumerGroup interface {
// consumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
type consumerGroupHandler struct {
maxMessageLen int
topicTag string
msgHeadersToTags map[string]bool
msgHeaderToMetricName string
timestampSource string
acc telegraf.TrackingAccumulator
sem semaphore
parser telegraf.Parser
wg sync.WaitGroup
cancel context.CancelFunc
mu sync.Mutex
undelivered map[telegraf.TrackingID]message
log telegraf.Logger
}
// message is an aggregate type binding the Kafka message and the session so that offsets can be updated.
type message struct {
message *sarama.ConsumerMessage
session sarama.ConsumerGroupSession
}
type (
empty struct{}
semaphore chan empty
)
type consumerGroup interface {
Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
Errors() <-chan error
Close() error
}
type ConsumerGroupCreator interface {
Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error)
type consumerGroupCreator interface {
create(brokers []string, group string, cfg *sarama.Config) (consumerGroup, error)
}
type saramaCreator struct{}
func (*saramaCreator) Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error) {
func (*saramaCreator) create(brokers []string, group string, cfg *sarama.Config) (consumerGroup, error) {
return sarama.NewConsumerGroup(brokers, group, cfg)
}
@ -92,10 +120,6 @@ func (*KafkaConsumer) SampleConfig() string {
return sampleConfig
}
func (k *KafkaConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
}
func (k *KafkaConsumer) Init() error {
kafka.SetLogger(k.Log.Level())
@ -154,8 +178,8 @@ func (k *KafkaConsumer) Init() error {
return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy)
}
if k.ConsumerCreator == nil {
k.ConsumerCreator = &saramaCreator{}
if k.consumerCreator == nil {
k.consumerCreator = &saramaCreator{}
}
cfg.Net.ResolveCanonicalBootstrapServers = k.ResolveCanonicalBootstrapServersOnly
@ -192,6 +216,105 @@ func (k *KafkaConsumer) Init() error {
return nil
}
func (k *KafkaConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
}
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
var err error
// If TopicRegexps is set, add matches to Topics
if len(k.TopicRegexps) > 0 {
if err := k.refreshTopics(); err != nil {
return err
}
}
ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
if k.ConnectionStrategy != "defer" {
err = k.create()
if err != nil {
return &internal.StartupError{
Err: fmt.Errorf("create consumer: %w", err),
Retry: errors.Is(err, sarama.ErrOutOfBrokers),
}
}
k.startErrorAdder(acc)
}
// Start consumer goroutine
k.wg.Add(1)
go func() {
var err error
defer k.wg.Done()
if k.consumer == nil {
err = k.create()
if err != nil {
acc.AddError(fmt.Errorf("create consumer async: %w", err))
return
}
}
k.startErrorAdder(acc)
for ctx.Err() == nil {
handler := newConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.maxMessageLen = k.MaxMessageLen
handler.topicTag = k.TopicTag
handler.msgHeaderToMetricName = k.MsgHeaderAsMetricName
// if message headers list specified, put it as map to handler
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
if len(k.MsgHeadersAsTags) > 0 {
for _, header := range k.MsgHeadersAsTags {
if k.MsgHeaderAsMetricName != header {
msgHeadersMap[header] = true
}
}
}
handler.msgHeadersToTags = msgHeadersMap
handler.timestampSource = k.TimestampSource
// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
topics := make([]string, len(k.allWantedTopics))
k.topicLock.Lock()
copy(topics, k.allWantedTopics)
k.topicLock.Unlock()
err := k.consumer.Consume(ctx, topics, handler)
if err != nil {
acc.AddError(fmt.Errorf("consume: %w", err))
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
}
}
err = k.consumer.Close()
if err != nil {
acc.AddError(fmt.Errorf("close: %w", err))
}
}()
return nil
}
func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (k *KafkaConsumer) Stop() {
// Lock so that a topic refresh cannot start while we are stopping.
k.topicLock.Lock()
if k.topicClient != nil {
k.topicClient.Close()
}
k.topicLock.Unlock()
k.cancel()
k.wg.Wait()
}
func (k *KafkaConsumer) compileTopicRegexps() error {
// While we can add new topics matching extant regexps, we can't
// update that list on the fly. We compile them once at startup.
@ -272,7 +395,7 @@ func (k *KafkaConsumer) refreshTopics() error {
func (k *KafkaConsumer) create() error {
var err error
k.consumer, err = k.ConsumerCreator.Create(
k.consumer, err = k.consumerCreator.create(
k.Brokers,
k.ConsumerGroup,
k.config,
@ -291,108 +414,7 @@ func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) {
}()
}
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
var err error
// If TopicRegexps is set, add matches to Topics
if len(k.TopicRegexps) > 0 {
if err := k.refreshTopics(); err != nil {
return err
}
}
ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
if k.ConnectionStrategy != "defer" {
err = k.create()
if err != nil {
return &internal.StartupError{
Err: fmt.Errorf("create consumer: %w", err),
Retry: errors.Is(err, sarama.ErrOutOfBrokers),
}
}
k.startErrorAdder(acc)
}
// Start consumer goroutine
k.wg.Add(1)
go func() {
var err error
defer k.wg.Done()
if k.consumer == nil {
err = k.create()
if err != nil {
acc.AddError(fmt.Errorf("create consumer async: %w", err))
return
}
}
k.startErrorAdder(acc)
for ctx.Err() == nil {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
handler.MsgHeaderToMetricName = k.MsgHeaderAsMetricName
// if message headers list specified, put it as map to handler
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
if len(k.MsgHeadersAsTags) > 0 {
for _, header := range k.MsgHeadersAsTags {
if k.MsgHeaderAsMetricName != header {
msgHeadersMap[header] = true
}
}
}
handler.MsgHeadersToTags = msgHeadersMap
handler.TimestampSource = k.TimestampSource
// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
topics := make([]string, len(k.allWantedTopics))
k.topicLock.Lock()
copy(topics, k.allWantedTopics)
k.topicLock.Unlock()
err := k.consumer.Consume(ctx, topics, handler)
if err != nil {
acc.AddError(fmt.Errorf("consume: %w", err))
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
}
}
err = k.consumer.Close()
if err != nil {
acc.AddError(fmt.Errorf("close: %w", err))
}
}()
return nil
}
func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (k *KafkaConsumer) Stop() {
// Lock so that a topic refresh cannot start while we are stopping.
k.topicLock.Lock()
if k.topicClient != nil {
k.topicClient.Close()
}
k.topicLock.Unlock()
k.cancel()
k.wg.Wait()
}
// message is an aggregate type binding the Kafka message and the session so that offsets can be updated.
type message struct {
message *sarama.ConsumerMessage
session sarama.ConsumerGroupSession
}
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *consumerGroupHandler {
func newConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *consumerGroupHandler {
handler := &consumerGroupHandler{
acc: acc.WithTracking(maxUndelivered),
sem: make(chan empty, maxUndelivered),
@ -403,28 +425,7 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parse
return handler
}
// consumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
type consumerGroupHandler struct {
MaxMessageLen int
TopicTag string
MsgHeadersToTags map[string]bool
MsgHeaderToMetricName string
TimestampSource string
acc telegraf.TrackingAccumulator
sem semaphore
parser telegraf.Parser
wg sync.WaitGroup
cancel context.CancelFunc
mu sync.Mutex
undelivered map[telegraf.TrackingID]message
log telegraf.Logger
}
// Setup is called once when a new session is opened. It setups up the handler
// and begins processing delivered messages.
// Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.
func (h *consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
h.undelivered = make(map[telegraf.TrackingID]message)
@ -439,6 +440,38 @@ func (h *consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()
for {
err := h.reserve(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return nil
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
err := h.handle(session, msg)
if err != nil {
h.acc.AddError(err)
}
}
}
}
// Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.
func (h *consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.cancel()
h.wg.Wait()
return nil
}
// Run processes any delivered metrics during the lifetime of the session.
func (h *consumerGroupHandler) run(ctx context.Context) {
for {
@ -469,8 +502,8 @@ func (h *consumerGroupHandler) onDelivery(track telegraf.DeliveryInfo) {
<-h.sem
}
// Reserve blocks until there is an available slot for a new message.
func (h *consumerGroupHandler) Reserve(ctx context.Context) error {
// reserve blocks until there is an available slot for a new message.
func (h *consumerGroupHandler) reserve(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -483,14 +516,13 @@ func (h *consumerGroupHandler) release() {
<-h.sem
}
// Handle processes a message and if successful saves it to be acknowledged
// after delivery.
func (h *consumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {
if h.MaxMessageLen != 0 && len(msg.Value) > h.MaxMessageLen {
// handle processes a message and if successful saves it to be acknowledged after delivery.
func (h *consumerGroupHandler) handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {
if h.maxMessageLen != 0 && len(msg.Value) > h.maxMessageLen {
session.MarkMessage(msg, "")
h.release()
return fmt.Errorf("message exceeds max_message_len (actual %d, max %d)",
len(msg.Value), h.MaxMessageLen)
len(msg.Value), h.maxMessageLen)
}
metrics, err := h.parser.Parse(msg.Value)
@ -507,17 +539,17 @@ func (h *consumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
}
// Check if any message header should override metric name or should be pass as tag
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {
if len(h.msgHeadersToTags) > 0 || h.msgHeaderToMetricName != "" {
for _, header := range msg.Headers {
// convert to a string as the header and value are byte arrays.
headerKey := string(header.Key)
if _, exists := h.MsgHeadersToTags[headerKey]; exists {
if _, exists := h.msgHeadersToTags[headerKey]; exists {
// If message header should be pass as tag then add it to the metrics
for _, metric := range metrics {
metric.AddTag(headerKey, string(header.Value))
}
} else {
if h.MsgHeaderToMetricName == headerKey {
if h.msgHeaderToMetricName == headerKey {
for _, metric := range metrics {
metric.SetName(string(header.Value))
}
@ -526,15 +558,15 @@ func (h *consumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
}
}
// Add topic name as tag with TopicTag name specified in the config
if len(h.TopicTag) > 0 {
// Add topic name as tag with topicTag name specified in the config
if len(h.topicTag) > 0 {
for _, metric := range metrics {
metric.AddTag(h.TopicTag, msg.Topic)
metric.AddTag(h.topicTag, msg.Topic)
}
}
// Do override the metric timestamp if required
switch h.TimestampSource {
switch h.timestampSource {
case "inner":
for _, metric := range metrics {
metric.SetTime(msg.Timestamp)
@ -552,40 +584,6 @@ func (h *consumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
return nil
}
// ConsumeClaim is called once each claim in a goroutine and must be
// thread-safe. Should run until the claim is closed.
func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()
for {
err := h.Reserve(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return nil
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
err := h.Handle(session, msg)
if err != nil {
h.acc.AddError(err)
}
}
}
}
// Cleanup stops the internal goroutine and is called after all ConsumeClaim
// functions have completed.
func (h *consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.cancel()
h.wg.Wait()
return nil
}
func init() {
inputs.Add("kafka_consumer", func() telegraf.Input {
return &KafkaConsumer{}

View File

@ -28,7 +28,7 @@ import (
"github.com/influxdata/telegraf/testutil"
)
type FakeConsumerGroup struct {
type fakeConsumerGroup struct {
brokers []string
group string
config *sarama.Config
@ -37,29 +37,29 @@ type FakeConsumerGroup struct {
errors chan error
}
func (g *FakeConsumerGroup) Consume(_ context.Context, _ []string, handler sarama.ConsumerGroupHandler) error {
func (g *fakeConsumerGroup) Consume(_ context.Context, _ []string, handler sarama.ConsumerGroupHandler) error {
g.handler = handler
return g.handler.Setup(nil)
}
func (g *FakeConsumerGroup) Errors() <-chan error {
func (g *fakeConsumerGroup) Errors() <-chan error {
return g.errors
}
func (g *FakeConsumerGroup) Close() error {
func (g *fakeConsumerGroup) Close() error {
close(g.errors)
return nil
}
type FakeCreator struct {
ConsumerGroup *FakeConsumerGroup
type fakeCreator struct {
consumerGroup *fakeConsumerGroup
}
func (c *FakeCreator) Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error) {
c.ConsumerGroup.brokers = brokers
c.ConsumerGroup.group = group
c.ConsumerGroup.config = cfg
return c.ConsumerGroup, nil
func (c *fakeCreator) create(brokers []string, group string, cfg *sarama.Config) (consumerGroup, error) {
c.consumerGroup.brokers = brokers
c.consumerGroup.group = group
c.consumerGroup.config = cfg
return c.consumerGroup, nil
}
func TestInit(t *testing.T) {
@ -206,8 +206,8 @@ func TestInit(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cg := &FakeConsumerGroup{}
tt.plugin.ConsumerCreator = &FakeCreator{ConsumerGroup: cg}
cg := &fakeConsumerGroup{}
tt.plugin.consumerCreator = &fakeCreator{consumerGroup: cg}
err := tt.plugin.Init()
if tt.initError {
require.Error(t, err)
@ -222,9 +222,9 @@ func TestInit(t *testing.T) {
}
func TestStartStop(t *testing.T) {
cg := &FakeConsumerGroup{errors: make(chan error)}
cg := &fakeConsumerGroup{errors: make(chan error)}
plugin := &KafkaConsumer{
ConsumerCreator: &FakeCreator{ConsumerGroup: cg},
consumerCreator: &fakeCreator{consumerGroup: cg},
Log: testutil.Logger{},
}
err := plugin.Init()
@ -301,7 +301,7 @@ func TestConsumerGroupHandlerLifecycle(t *testing.T) {
MetricName: "cpu",
DataType: "int",
}
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := newConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -335,7 +335,7 @@ func TestConsumerGroupHandlerConsumeClaim(t *testing.T) {
DataType: "int",
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg := newConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -457,15 +457,15 @@ func TestConsumerGroupHandlerHandle(t *testing.T) {
DataType: "int",
}
require.NoError(t, parser.Init())
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg.MaxMessageLen = tt.maxMessageLen
cg.TopicTag = tt.topicTag
cg := newConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg.maxMessageLen = tt.maxMessageLen
cg.topicTag = tt.topicTag
ctx := context.Background()
session := &FakeConsumerGroupSession{ctx: ctx}
require.NoError(t, cg.Reserve(ctx))
err := cg.Handle(session, tt.msg)
require.NoError(t, cg.reserve(ctx))
err := cg.handle(session, tt.msg)
if tt.expectedHandleError != "" {
require.Error(t, err)
require.EqualValues(t, tt.expectedHandleError, err.Error())

View File

@ -23,8 +23,8 @@ const (
)
type Kapacitor struct {
URLs []string `toml:"urls"`
Timeout config.Duration
URLs []string `toml:"urls"`
Timeout config.Duration `toml:"timeout"`
tls.ClientConfig
client *http.Client

View File

@ -40,6 +40,10 @@ type Kernel struct {
procfs procfs.FS
}
func (*Kernel) SampleConfig() string {
return sampleConfig
}
func (k *Kernel) Init() error {
k.optCollect = make(map[string]bool, len(k.ConfigCollect))
for _, v := range k.ConfigCollect {
@ -63,10 +67,6 @@ func (k *Kernel) Init() error {
return nil
}
func (*Kernel) SampleConfig() string {
return sampleConfig
}
func (k *Kernel) Gather(acc telegraf.Accumulator) error {
data, err := k.getProcValueBytes(k.statFile)
if err != nil {

View File

@ -17,11 +17,13 @@ type Kernel struct {
Log telegraf.Logger `toml:"-"`
}
func (*Kernel) SampleConfig() string { return sampleConfig }
func (k *Kernel) Init() error {
k.Log.Warn("current platform is not supported")
k.Log.Warn("Current platform is not supported")
return nil
}
func (*Kernel) SampleConfig() string { return sampleConfig }
func (*Kernel) Gather(_ telegraf.Accumulator) error { return nil }
func init() {

View File

@ -17,11 +17,13 @@ type KernelVmstat struct {
Log telegraf.Logger `toml:"-"`
}
func (*KernelVmstat) SampleConfig() string { return sampleConfig }
func (k *KernelVmstat) Init() error {
k.Log.Warn("current platform is not supported")
k.Log.Warn("Current platform is not supported")
return nil
}
func (*KernelVmstat) SampleConfig() string { return sampleConfig }
func (*KernelVmstat) Gather(_ telegraf.Accumulator) error { return nil }
func init() {

View File

@ -24,6 +24,17 @@ var sampleConfig string
const statusPath = "/api/status"
type Kibana struct {
Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
Log telegraf.Logger `toml:"-"`
client *http.Client
common_http.HTTPClientConfig
}
type kibanaStatus struct {
Name string `json:"name"`
Version version `json:"version"`
@ -86,39 +97,6 @@ type heap struct {
SizeLimit int64 `json:"size_limit"`
}
type Kibana struct {
Local bool
Servers []string
Username string
Password string
Log telegraf.Logger `toml:"-"`
client *http.Client
common_http.HTTPClientConfig
}
func NewKibana() *Kibana {
return &Kibana{
HTTPClientConfig: common_http.HTTPClientConfig{
Timeout: config.Duration(5 * time.Second),
},
}
}
// perform status mapping
func mapHealthStatusToCode(s string) int {
switch strings.ToLower(s) {
case "green":
return 1
case "yellow":
return 2
case "red":
return 3
}
return 0
}
func (*Kibana) SampleConfig() string {
return sampleConfig
}
@ -248,8 +226,29 @@ func (k *Kibana) gatherJSONData(url string, v interface{}) (host string, err err
return request.Host, nil
}
// perform status mapping
func mapHealthStatusToCode(s string) int {
switch strings.ToLower(s) {
case "green":
return 1
case "yellow":
return 2
case "red":
return 3
}
return 0
}
func newKibana() *Kibana {
return &Kibana{
HTTPClientConfig: common_http.HTTPClientConfig{
Timeout: config.Duration(5 * time.Second),
},
}
}
func init() {
inputs.Add("kibana", func() telegraf.Input {
return NewKibana()
return newKibana()
})
}

View File

@ -81,7 +81,7 @@ func TestGather(t *testing.T) {
}
func newKibanahWithClient() *Kibana {
ks := NewKibana()
ks := newKibana()
ks.client = &http.Client{}
return ks
}

View File

@ -31,22 +31,26 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
var (
once sync.Once
// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
maxSeq = strToBint(strings.Repeat("9", 129))
negOne *big.Int
)
const (
defaultMaxUndeliveredMessages = 1000
)
type (
DynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
}
KinesisConsumer struct {
StreamName string `toml:"streamname"`
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"`
DynamoDB *dynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
ContentEncoding string `toml:"content_encoding"`
Log telegraf.Logger
Log telegraf.Logger `toml:"-"`
cons *consumer.Consumer
parser telegraf.Parser
@ -68,47 +72,70 @@ type (
common_aws.CredentialConfig
}
dynamoDB struct {
AppName string `toml:"app_name"`
TableName string `toml:"table_name"`
}
checkpoint struct {
streamName string
shardID string
}
)
const (
defaultMaxUndeliveredMessages = 1000
)
type processContent func([]byte) ([]byte, error)
// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
var maxSeq = strToBint(strings.Repeat("9", 129))
func (*KinesisConsumer) SampleConfig() string {
return sampleConfig
}
func (k *KinesisConsumer) Init() error {
return k.configureProcessContentEncodingFunc()
}
func (k *KinesisConsumer) SetParser(parser telegraf.Parser) {
k.parser = parser
}
type TelegrafLoggerWrapper struct {
telegraf.Logger
}
func (t *TelegrafLoggerWrapper) Log(args ...interface{}) {
t.Trace(args...)
}
func (t *TelegrafLoggerWrapper) Logf(classification logging.Classification, format string, v ...interface{}) {
switch classification {
case logging.Debug:
format = "DEBUG " + format
case logging.Warn:
format = "WARN" + format
default:
format = "INFO " + format
func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
err := k.connect(ac)
if err != nil {
return err
}
t.Logger.Tracef(format, v...)
return nil
}
func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error {
if k.cons == nil {
return k.connect(acc)
}
k.lastSeqNum = maxSeq
return nil
}
func (k *KinesisConsumer) Stop() {
k.cancel()
k.wg.Wait()
}
// GetCheckpoint wraps the checkpoint's GetCheckpoint function (called by consumer library)
func (k *KinesisConsumer) GetCheckpoint(streamName, shardID string) (string, error) {
return k.checkpoint.GetCheckpoint(streamName, shardID)
}
// SetCheckpoint wraps the checkpoint's SetCheckpoint function (called by consumer library)
func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
if sequenceNumber == "" {
return errors.New("sequence number should not be empty")
}
k.checkpointTex.Lock()
k.checkpoints[sequenceNumber] = checkpoint{streamName: streamName, shardID: shardID}
k.checkpointTex.Unlock()
return nil
}
func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
@ -121,7 +148,7 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
cfg.BaseEndpoint = &k.EndpointURL
}
logWrapper := &TelegrafLoggerWrapper{k.Log}
logWrapper := &telegrafLoggerWrapper{k.Log}
cfg.Logger = logWrapper
cfg.ClientLogMode = aws.LogRetries
client := kinesis.NewFromConfig(cfg)
@ -195,15 +222,6 @@ func (k *KinesisConsumer) connect(ac telegraf.Accumulator) error {
return nil
}
func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
err := k.connect(ac)
if err != nil {
return err
}
return nil
}
func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
data, err := k.processContentEncodingFunc(r.Data)
if err != nil {
@ -270,48 +288,6 @@ func (k *KinesisConsumer) onDelivery(ctx context.Context) {
}
}
var negOne *big.Int
func strToBint(s string) *big.Int {
n, ok := new(big.Int).SetString(s, 10)
if !ok {
return negOne
}
return n
}
func (k *KinesisConsumer) Stop() {
k.cancel()
k.wg.Wait()
}
func (k *KinesisConsumer) Gather(acc telegraf.Accumulator) error {
if k.cons == nil {
return k.connect(acc)
}
k.lastSeqNum = maxSeq
return nil
}
// Get wraps the checkpoint's GetCheckpoint function (called by consumer library)
func (k *KinesisConsumer) GetCheckpoint(streamName, shardID string) (string, error) {
return k.checkpoint.GetCheckpoint(streamName, shardID)
}
// Set wraps the checkpoint's SetCheckpoint function (called by consumer library)
func (k *KinesisConsumer) SetCheckpoint(streamName, shardID, sequenceNumber string) error {
if sequenceNumber == "" {
return errors.New("sequence number should not be empty")
}
k.checkpointTex.Lock()
k.checkpoints[sequenceNumber] = checkpoint{streamName: streamName, shardID: shardID}
k.checkpointTex.Unlock()
return nil
}
func processGzip(data []byte) ([]byte, error) {
zipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
@ -334,6 +310,14 @@ func processNoOp(data []byte) ([]byte, error) {
return data, nil
}
func strToBint(s string) *big.Int {
n, ok := new(big.Int).SetString(s, 10)
if !ok {
return negOne
}
return n
}
func (k *KinesisConsumer) configureProcessContentEncodingFunc() error {
switch k.ContentEncoding {
case "gzip":
@ -348,10 +332,27 @@ func (k *KinesisConsumer) configureProcessContentEncodingFunc() error {
return nil
}
func (k *KinesisConsumer) Init() error {
return k.configureProcessContentEncodingFunc()
type telegrafLoggerWrapper struct {
telegraf.Logger
}
func (t *telegrafLoggerWrapper) Log(args ...interface{}) {
t.Trace(args...)
}
func (t *telegrafLoggerWrapper) Logf(classification logging.Classification, format string, v ...interface{}) {
switch classification {
case logging.Debug:
format = "DEBUG " + format
case logging.Warn:
format = "WARN" + format
default:
format = "INFO " + format
}
t.Logger.Tracef(format, v...)
}
// noopStore implements the storage interface with discard
type noopStore struct{}
func (n noopStore) SetCheckpoint(string, string, string) error { return nil }

View File

@ -4,25 +4,25 @@ import (
"github.com/vapourismo/knx-go/knx"
)
type KNXDummyInterface struct {
type knxDummyInterface struct {
inbound chan knx.GroupEvent
}
func NewDummyInterface() KNXDummyInterface {
di := KNXDummyInterface{}
func newDummyInterface() knxDummyInterface {
di := knxDummyInterface{}
di.inbound = make(chan knx.GroupEvent)
return di
}
func (di *KNXDummyInterface) Send(event knx.GroupEvent) {
func (di *knxDummyInterface) Send(event knx.GroupEvent) {
di.inbound <- event
}
func (di *KNXDummyInterface) Inbound() <-chan knx.GroupEvent {
func (di *knxDummyInterface) Inbound() <-chan knx.GroupEvent {
return di.inbound
}
func (di *KNXDummyInterface) Close() {
func (di *knxDummyInterface) Close() {
close(di.inbound)
}

View File

@ -19,9 +19,25 @@ import (
//go:embed sample.conf
var sampleConfig string
type KNXInterface interface {
Inbound() <-chan knx.GroupEvent
Close()
type KNXListener struct {
ServiceType string `toml:"service_type"`
ServiceAddress string `toml:"service_address"`
Measurements []measurement `toml:"measurement"`
Log telegraf.Logger `toml:"-"`
client knxInterface
gaTargetMap map[string]addressTarget
gaLogbook map[string]bool
wg sync.WaitGroup
connected atomic.Bool
}
type measurement struct {
Name string `toml:"name"`
Dpt string `toml:"dpt"`
AsString bool `toml:"as_string"`
Addresses []string `toml:"addresses"`
}
type addressTarget struct {
@ -30,43 +46,15 @@ type addressTarget struct {
datapoint dpt.Datapoint
}
type Measurement struct {
Name string `toml:"name"`
Dpt string `toml:"dpt"`
AsString bool `toml:"as_string"`
Addresses []string `toml:"addresses"`
}
type KNXListener struct {
ServiceType string `toml:"service_type"`
ServiceAddress string `toml:"service_address"`
Measurements []Measurement `toml:"measurement"`
Log telegraf.Logger `toml:"-"`
client KNXInterface
gaTargetMap map[string]addressTarget
gaLogbook map[string]bool
wg sync.WaitGroup
connected atomic.Bool
type knxInterface interface {
Inbound() <-chan knx.GroupEvent
Close()
}
func (*KNXListener) SampleConfig() string {
return sampleConfig
}
func (kl *KNXListener) Gather(acc telegraf.Accumulator) error {
if !kl.connected.Load() {
// We got disconnected for some reason, so try to reconnect in every
// gather cycle until we are reconnected
if err := kl.Start(acc); err != nil {
return fmt.Errorf("reconnecting to bus failed: %w", err)
}
}
return nil
}
func (kl *KNXListener) Init() error {
// Setup a logbook to track unknown GAs to avoid log-spamming
kl.gaLogbook = make(map[string]bool)
@ -119,7 +107,7 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
}
kl.client = &c
case "dummy":
c := NewDummyInterface()
c := newDummyInterface()
kl.client = &c
default:
return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress)
@ -139,6 +127,18 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
return nil
}
func (kl *KNXListener) Gather(acc telegraf.Accumulator) error {
if !kl.connected.Load() {
// We got disconnected for some reason, so try to reconnect in every
// gather cycle until we are reconnected
if err := kl.Start(acc); err != nil {
return fmt.Errorf("reconnecting to bus failed: %w", err)
}
}
return nil
}
func (kl *KNXListener) Stop() {
if kl.client != nil {
kl.client.Close()

View File

@ -111,9 +111,9 @@ func TestRegularReceives_DPT(t *testing.T) {
acc := &testutil.Accumulator{}
// Setup the unit-under-test
measurements := make([]Measurement, 0, len(testcases))
measurements := make([]measurement, 0, len(testcases))
for _, testcase := range testcases {
measurements = append(measurements, Measurement{
measurements = append(measurements, measurement{
Name: "test",
Dpt: testcase.dpt,
AsString: testcase.asstring,
@ -130,7 +130,7 @@ func TestRegularReceives_DPT(t *testing.T) {
// Setup the listener to test
err := listener.Start(acc)
require.NoError(t, err)
client := listener.client.(*KNXDummyInterface)
client := listener.client.(*knxDummyInterface)
tstart := time.Now()
@ -167,7 +167,7 @@ func TestRegularReceives_DPT(t *testing.T) {
func TestRegularReceives_MultipleMessages(t *testing.T) {
listener := KNXListener{
ServiceType: "dummy",
Measurements: []Measurement{
Measurements: []measurement{
{Name: "temperature", Dpt: "1.001", Addresses: []string{"1/1/1"}},
},
Log: testutil.Logger{Name: "knx_listener"},
@ -179,7 +179,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
// Setup the listener to test
err := listener.Start(acc)
require.NoError(t, err)
client := listener.client.(*KNXDummyInterface)
client := listener.client.(*knxDummyInterface)
testMessages := []message{
{"1/1/1", "1.001", true},
@ -220,7 +220,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
func TestReconnect(t *testing.T) {
listener := KNXListener{
ServiceType: "dummy",
Measurements: []Measurement{
Measurements: []measurement{
{Name: "temperature", Dpt: "1.001", Addresses: []string{"1/1/1"}},
},
Log: testutil.Logger{Name: "knx_listener"},
@ -232,7 +232,7 @@ func TestReconnect(t *testing.T) {
// Setup the listener to test
require.NoError(t, listener.Start(&acc))
defer listener.Stop()
client := listener.client.(*KNXDummyInterface)
client := listener.client.(*knxDummyInterface)
testMessages := []message{
{"1/1/1", "1.001", true},
@ -266,7 +266,7 @@ func TestReconnect(t *testing.T) {
require.Eventually(t, func() bool {
return listener.connected.Load()
}, 3*time.Second, 100*time.Millisecond, "no reconnect")
client = listener.client.(*KNXDummyInterface)
client = listener.client.(*knxDummyInterface)
for _, testcase := range testMessages {
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)

View File

@ -23,11 +23,38 @@ import (
//go:embed sample.conf
var sampleConfig string
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){
"daemonsets": collectDaemonSets,
"deployments": collectDeployments,
"endpoints": collectEndpoints,
"ingress": collectIngress,
"nodes": collectNodes,
"pods": collectPods,
"services": collectServices,
"statefulsets": collectStatefulSets,
"persistentvolumes": collectPersistentVolumes,
"persistentvolumeclaims": collectPersistentVolumeClaims,
"resourcequotas": collectResourceQuotas,
"secrets": collectSecrets,
}
const (
daemonSetMeasurement = "kubernetes_daemonset"
deploymentMeasurement = "kubernetes_deployment"
endpointMeasurement = "kubernetes_endpoint"
ingressMeasurement = "kubernetes_ingress"
nodeMeasurement = "kubernetes_node"
persistentVolumeMeasurement = "kubernetes_persistentvolume"
persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim"
podContainerMeasurement = "kubernetes_pod_container"
serviceMeasurement = "kubernetes_service"
statefulSetMeasurement = "kubernetes_statefulset"
resourcequotaMeasurement = "kubernetes_resourcequota"
certificateMeasurement = "kubernetes_certificate"
defaultServiceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
)
// KubernetesInventory represents the config object for the plugin.
type KubernetesInventory struct {
URL string `toml:"url"`
KubeletURL string `toml:"url_kubelet"`
@ -116,21 +143,6 @@ func (ki *KubernetesInventory) Gather(acc telegraf.Accumulator) (err error) {
return nil
}
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){
"daemonsets": collectDaemonSets,
"deployments": collectDeployments,
"endpoints": collectEndpoints,
"ingress": collectIngress,
"nodes": collectNodes,
"pods": collectPods,
"services": collectServices,
"statefulsets": collectStatefulSets,
"persistentvolumes": collectPersistentVolumes,
"persistentvolumeclaims": collectPersistentVolumeClaims,
"resourcequotas": collectResourceQuotas,
"secrets": collectSecrets,
}
func atoi(s string) int64 {
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
@ -155,6 +167,7 @@ func (ki *KubernetesInventory) convertQuantity(s string, m float64) int64 {
}
return int64(f * m)
}
func (ki *KubernetesInventory) queryPodsFromKubelet(url string, v interface{}) error {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
@ -187,21 +200,6 @@ func (ki *KubernetesInventory) createSelectorFilters() error {
return nil
}
const (
daemonSetMeasurement = "kubernetes_daemonset"
deploymentMeasurement = "kubernetes_deployment"
endpointMeasurement = "kubernetes_endpoint"
ingressMeasurement = "kubernetes_ingress"
nodeMeasurement = "kubernetes_node"
persistentVolumeMeasurement = "kubernetes_persistentvolume"
persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim"
podContainerMeasurement = "kubernetes_pod_container"
serviceMeasurement = "kubernetes_service"
statefulSetMeasurement = "kubernetes_statefulset"
resourcequotaMeasurement = "kubernetes_resourcequota"
certificateMeasurement = "kubernetes_certificate"
)
func init() {
inputs.Add("kube_inventory", func() telegraf.Input {
return &KubernetesInventory{

View File

@ -48,14 +48,6 @@ type Kubernetes struct {
httpClient *http.Client
}
func init() {
inputs.Add("kubernetes", func() telegraf.Input {
return &Kubernetes{
LabelExclude: []string{"*"},
}
})
}
func (*Kubernetes) SampleConfig() string {
return sampleConfig
}
@ -83,7 +75,6 @@ func (k *Kubernetes) Init() error {
return nil
}
// Gather collects kubernetes metrics from a given URL
func (k *Kubernetes) Gather(acc telegraf.Accumulator) error {
if k.URL != "" {
acc.AddError(k.gatherSummary(k.URL, acc))
@ -156,7 +147,7 @@ func getNodeAddress(addresses []v1.NodeAddress) string {
func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
summaryMetrics := &summaryMetrics{}
err := k.LoadJSON(baseURL+"/stats/summary", summaryMetrics)
err := k.loadJSON(baseURL+"/stats/summary", summaryMetrics)
if err != nil {
return err
}
@ -219,18 +210,18 @@ func buildNodeMetrics(summaryMetrics *summaryMetrics, acc telegraf.Accumulator,
acc.AddFields(metricName, fields, tags)
}
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Item, error) {
var podAPI Pods
err := k.LoadJSON(baseURL+"/pods", &podAPI)
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]item, error) {
var podAPI pods
err := k.loadJSON(baseURL+"/pods", &podAPI)
if err != nil {
return nil, err
}
podInfos := make([]Item, 0, len(podAPI.Items))
podInfos := make([]item, 0, len(podAPI.Items))
podInfos = append(podInfos, podAPI.Items...)
return podInfos, nil
}
func (k *Kubernetes) LoadJSON(url string, v interface{}) error {
func (k *Kubernetes) loadJSON(url string, v interface{}) error {
var req, err = http.NewRequest("GET", url, nil)
if err != nil {
return err
@ -282,7 +273,7 @@ func (k *Kubernetes) LoadJSON(url string, v interface{}) error {
return nil
}
func buildPodMetrics(summaryMetrics *summaryMetrics, podInfo []Item, labelFilter filter.Filter, acc telegraf.Accumulator) {
func buildPodMetrics(summaryMetrics *summaryMetrics, podInfo []item, labelFilter filter.Filter, acc telegraf.Accumulator) {
for _, pod := range summaryMetrics.Pods {
podLabels := make(map[string]string)
containerImages := make(map[string]string)
@ -368,3 +359,11 @@ func buildPodMetrics(summaryMetrics *summaryMetrics, podInfo []Item, labelFilter
acc.AddFields("kubernetes_pod_network", fields, tags)
}
}
func init() {
inputs.Add("kubernetes", func() telegraf.Input {
return &Kubernetes{
LabelExclude: []string{"*"},
}
})
}

View File

@ -1,27 +1,27 @@
package kubernetes
type Pods struct {
type pods struct {
Kind string `json:"kind"`
APIVersion string `json:"apiVersion"`
Items []Item `json:"items"`
Items []item `json:"items"`
}
type Item struct {
Metadata Metadata `json:"metadata"`
Spec Spec `json:"spec"`
type item struct {
Metadata metadata `json:"metadata"`
Spec spec `json:"spec"`
}
type Metadata struct {
type metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Labels map[string]string `json:"labels"`
}
type Spec struct {
Containers []Container `json:"containers"`
type spec struct {
Containers []container `json:"containers"`
}
type Container struct {
type container struct {
Name string `json:"name"`
Image string `json:"image"`
}