From bcafb1d24d0554188b66956f1264ad4fa5078efb Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Wed, 5 Jun 2024 14:22:55 -0600 Subject: [PATCH] chore: Print debug message when no messages generated from parse (#15463) --- internal/internal.go | 1 + plugins/inputs/amqp_consumer/amqp_consumer.go | 7 +++++++ plugins/inputs/cloud_pubsub/cloud_pubsub.go | 7 +++++++ .../cloud_pubsub_push/cloud_pubsub_push.go | 9 +++++++++ .../directory_monitor/directory_monitor.go | 9 +++++++++ .../eventhub_consumer/eventhub_consumer.go | 8 ++++++++ plugins/inputs/exec/exec.go | 9 +++++++++ plugins/inputs/execd/execd.go | 10 ++++++++++ plugins/inputs/file/file.go | 19 +++++++++++++++---- plugins/inputs/file/file_test.go | 10 ++++++++++ plugins/inputs/http/http.go | 8 ++++++++ .../http_listener_v2/http_listener_v2.go | 9 +++++++++ .../inputs/kafka_consumer/kafka_consumer.go | 8 ++++++++ .../kinesis_consumer/kinesis_consumer.go | 9 +++++++++ plugins/inputs/mqtt_consumer/mqtt_consumer.go | 3 +-- plugins/inputs/nats_consumer/nats_consumer.go | 8 ++++++++ .../inputs/socket_listener/socket_listener.go | 9 +++++++++ plugins/inputs/tail/tail.go | 9 ++++++++- 18 files changed, 145 insertions(+), 7 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 7886f1381..2fc0f6464 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -24,6 +24,7 @@ import ( ) const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" +const NoMetricsCreatedMsg = "No metrics were created from a message. Verify your parser settings. This message is only printed once." var once sync.Once diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 3a9305d8e..f076261a3 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -23,6 +23,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + type empty struct{} type semaphore chan empty @@ -441,6 +443,11 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive onError() return err } + if len(metrics) == 0 { + once.Do(func() { + a.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } id := acc.AddTrackingMetricGroup(metrics) a.deliveries[id] = d diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub.go b/plugins/inputs/cloud_pubsub/cloud_pubsub.go index 0d0c2b686..379243130 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub.go @@ -23,6 +23,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + type empty struct{} type semaphore chan empty @@ -190,6 +192,11 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error { if len(metrics) == 0 { msg.Ack() + + once.Do(func() { + ps.Log.Debug(internal.NoMetricsCreatedMsg) + }) + return nil } diff --git a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go index abe23d613..ff797520c 100644 --- a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go +++ b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -21,6 +22,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + // defaultMaxBodySize is the default maximum request body size, in bytes. // if the request body is over this size, we will return an HTTP 413 error. // 500 MB @@ -198,6 +201,12 @@ func (p *PubSubPush) serveWrite(res http.ResponseWriter, req *http.Request) { return } + if len(metrics) == 0 { + once.Do(func() { + p.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + if p.AddMeta { for i := range metrics { for k, v := range payload.Msg.Atts { diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index 723c8e142..e752c179a 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -21,6 +21,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" @@ -30,6 +31,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + var ( defaultFilesToMonitor = []string{} defaultFilesToIgnore = []string{} @@ -306,6 +309,12 @@ func (monitor *DirectoryMonitor) parseMetrics(parser telegraf.Parser, line []byt return nil, err } + if len(metrics) == 0 { + once.Do(func() { + monitor.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + if monitor.FileTag != "" { for _, m := range metrics { m.AddTag(monitor.FileTag, filepath.Base(fileName)) diff --git a/plugins/inputs/eventhub_consumer/eventhub_consumer.go b/plugins/inputs/eventhub_consumer/eventhub_consumer.go index a90dfecc1..c7331b99b 100644 --- a/plugins/inputs/eventhub_consumer/eventhub_consumer.go +++ b/plugins/inputs/eventhub_consumer/eventhub_consumer.go @@ -20,6 +20,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + const ( defaultMaxUndeliveredMessages = 1000 ) @@ -268,6 +270,12 @@ func (e *EventHub) createMetrics(event *eventhubClient.Event) ([]telegraf.Metric return nil, err } + if len(metrics) == 0 { + once.Do(func() { + e.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + for i := range metrics { for _, field := range e.ApplicationPropertyFields { if val, ok := event.Get(field); ok { diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 5405bc544..0e0f8f02b 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/nagios" @@ -23,6 +24,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + const MaxStderrBytes int = 512 type exitcodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric @@ -118,6 +121,12 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync return } + if len(metrics) == 0 { + once.Do(func() { + e.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + if e.exitcodeHandler != nil { metrics = e.exitcodeHandler(metrics, runErr, errBuf) } diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index ea3657902..ee7cbc621 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -9,10 +9,12 @@ import ( "io" "os" "strings" + "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/process" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/inputs" @@ -22,6 +24,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + type Execd struct { Command []string `toml:"command"` Environment []string `toml:"environment"` @@ -102,6 +106,12 @@ func (e *Execd) cmdReadOut(out io.Reader) { e.acc.AddError(fmt.Errorf("parse error: %w", err)) } + if len(metrics) == 0 { + once.Do(func() { + e.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + for _, metric := range metrics { e.acc.AddMetric(metric) } diff --git a/plugins/inputs/file/file.go b/plugins/inputs/file/file.go index a0b13c472..45e3e7e3d 100644 --- a/plugins/inputs/file/file.go +++ b/plugins/inputs/file/file.go @@ -7,10 +7,12 @@ import ( "io" "os" "path/filepath" + "sync" "github.com/dimchansky/utfbom" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/inputs" @@ -19,11 +21,14 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + type File struct { - Files []string `toml:"files"` - FileTag string `toml:"file_tag"` - FilePathTag string `toml:"file_path_tag"` - CharacterEncoding string `toml:"character_encoding"` + Files []string `toml:"files"` + FileTag string `toml:"file_tag"` + FilePathTag string `toml:"file_path_tag"` + CharacterEncoding string `toml:"character_encoding"` + Log telegraf.Logger `toml:"-"` parserFunc telegraf.ParserFunc filenames []string @@ -108,6 +113,12 @@ func (f *File) readMetric(filename string) ([]telegraf.Metric, error) { if err != nil { return metrics, fmt.Errorf("could not parse %q: %w", filename, err) } + + if len(metrics) == 0 { + once.Do(func() { + f.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } return metrics, err } diff --git a/plugins/inputs/file/file_test.go b/plugins/inputs/file/file_test.go index 91e7fc8a8..a4ddb739e 100644 --- a/plugins/inputs/file/file_test.go +++ b/plugins/inputs/file/file_test.go @@ -28,6 +28,7 @@ func TestRefreshFilePaths(t *testing.T) { r := File{ Files: []string{filepath.Join(wd, "dev", "testfiles", "**.log")}, + Log: testutil.Logger{}, } err = r.Init() require.NoError(t, err) @@ -45,6 +46,7 @@ func TestFileTag(t *testing.T) { Files: []string{filepath.Join(wd, "dev", "testfiles", "json_a.log")}, FileTag: "filename", FilePathTag: "filepath", + Log: testutil.Logger{}, } require.NoError(t, r.Init()) @@ -70,6 +72,7 @@ func TestJSONParserCompile(t *testing.T) { wd, _ := os.Getwd() r := File{ Files: []string{filepath.Join(wd, "dev", "testfiles", "json_a.log")}, + Log: testutil.Logger{}, } require.NoError(t, r.Init()) @@ -89,6 +92,7 @@ func TestGrokParser(t *testing.T) { var acc testutil.Accumulator r := File{ Files: []string{filepath.Join(wd, "dev", "testfiles", "grok_a.log")}, + Log: testutil.Logger{}, } err := r.Init() require.NoError(t, err) @@ -191,6 +195,7 @@ func TestCharacterEncoding(t *testing.T) { plugin: &File{ Files: []string{"testdata/mtr-utf-8.csv"}, CharacterEncoding: "", + Log: testutil.Logger{}, }, csv: csv.Parser{ MetricName: "file", @@ -204,6 +209,7 @@ func TestCharacterEncoding(t *testing.T) { plugin: &File{ Files: []string{"testdata/mtr-utf-8.csv"}, CharacterEncoding: "utf-8", + Log: testutil.Logger{}, }, csv: csv.Parser{ MetricName: "file", @@ -217,6 +223,7 @@ func TestCharacterEncoding(t *testing.T) { plugin: &File{ Files: []string{"testdata/mtr-utf-16le.csv"}, CharacterEncoding: "utf-16le", + Log: testutil.Logger{}, }, csv: csv.Parser{ MetricName: "file", @@ -230,6 +237,7 @@ func TestCharacterEncoding(t *testing.T) { plugin: &File{ Files: []string{"testdata/mtr-utf-16be.csv"}, CharacterEncoding: "utf-16be", + Log: testutil.Logger{}, }, csv: csv.Parser{ MetricName: "file", @@ -343,6 +351,7 @@ func TestStatefulParsers(t *testing.T) { plugin: &File{ Files: []string{"testdata/mtr-utf-8.csv"}, CharacterEncoding: "", + Log: testutil.Logger{}, }, csv: csv.Parser{ MetricName: "file", @@ -390,6 +399,7 @@ func TestCSVBehavior(t *testing.T) { // Setup the plugin plugin := &File{ Files: []string{filepath.Join("testdata", "csv_behavior_input.csv")}, + Log: testutil.Logger{}, } plugin.SetParserFunc(parserFunc) require.NoError(t, plugin.Init()) diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index ee6145945..c2b74e77c 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -23,6 +23,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + type HTTP struct { URLs []string `toml:"urls"` Method string `toml:"method"` @@ -207,6 +209,12 @@ func (h *HTTP) gatherURL(acc telegraf.Accumulator, url string) error { return fmt.Errorf("parsing metrics failed: %w", err) } + if len(metrics) == 0 { + once.Do(func() { + h.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + for _, metric := range metrics { if !metric.HasTag("url") { metric.AddTag("url", url) diff --git a/plugins/inputs/http_listener_v2/http_listener_v2.go b/plugins/inputs/http_listener_v2/http_listener_v2.go index 134f2edc7..78d9dbfdf 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/choice" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" @@ -27,6 +28,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + // defaultMaxBodySize is the default maximum request body size, in bytes. // if the request body is over this size, we will return an HTTP 413 error. // 500 MB @@ -236,6 +239,12 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) return } + if len(metrics) == 0 { + once.Do(func() { + h.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + for _, m := range metrics { for headerName, measurementName := range h.HTTPHeaderTags { headerValues := req.Header.Get(headerName) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index edbcc97a2..6e9e637bb 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -23,6 +23,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + const ( defaultMaxUndeliveredMessages = 1000 defaultMaxProcessingTime = config.Duration(100 * time.Millisecond) @@ -490,6 +492,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * return err } + if len(metrics) == 0 { + once.Do(func() { + h.log.Debug(internal.NoMetricsCreatedMsg) + }) + } + headerKey := "" // Check if any message header should override metric name or should be pass as tag if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" { diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer.go b/plugins/inputs/kinesis_consumer/kinesis_consumer.go index 73486f739..1133c961c 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer.go @@ -21,6 +21,7 @@ import ( "github.com/harlow/kinesis-consumer/store/ddb" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" internalaws "github.com/influxdata/telegraf/plugins/common/aws" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -28,6 +29,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + type ( DynamoDB struct { AppName string `toml:"app_name"` @@ -180,6 +183,12 @@ func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consume return err } + if len(metrics) == 0 { + once.Do(func() { + k.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + k.recordsTex.Lock() id := acc.AddTrackingMetricGroup(metrics) k.records[id] = *r.SequenceNumber diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index c6cf906e2..901bedce9 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -275,8 +275,7 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { if err != nil || len(metrics) == 0 { if len(metrics) == 0 { once.Do(func() { - const msg = "No metrics were created from a message. Verify your parser settings. This message is only printed once." - m.Log.Debug(msg) + m.Log.Debug(internal.NoMetricsCreatedMsg) }) } diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index d23ad23eb..fc20cb945 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -11,6 +11,7 @@ import ( "github.com/nats-io/nats.go" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -18,6 +19,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + var ( defaultMaxUndeliveredMessages = 1000 ) @@ -220,6 +223,11 @@ func (n *natsConsumer) receiver(ctx context.Context) { <-sem continue } + if len(metrics) == 0 { + once.Do(func() { + n.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } for _, m := range metrics { m.AddTag("subject", msg.Subject) } diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 7d1dc6b3b..b4783afc1 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -5,8 +5,10 @@ package socket_listener import ( _ "embed" "net" + "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/socket" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -14,6 +16,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + type SocketListener struct { ServiceAddress string `toml:"service_address"` Log telegraf.Logger `toml:"-"` @@ -54,6 +58,11 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { acc.AddError(err) return } + if len(metrics) == 0 { + once.Do(func() { + sl.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } for _, m := range metrics { acc.AddMetric(m) } diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index ecb9b9442..f6a71aaf3 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -18,6 +18,7 @@ import ( "github.com/pborman/ansi" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/inputs" @@ -27,6 +28,8 @@ import ( //go:embed sample.conf var sampleConfig string +var once sync.Once + const ( defaultWatchMethod = "inotify" ) @@ -340,7 +343,11 @@ func (t *Tail) receiver(parser telegraf.Parser, tailer *tail.Tail) { tailer.Filename, text, err.Error()) continue } - + if len(metrics) == 0 { + once.Do(func() { + t.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } if t.PathTag != "" { for _, metric := range metrics { metric.AddTag(t.PathTag, tailer.Filename)