chore: Print debug message when no messages generated from parse (#15463)

This commit is contained in:
Joshua Powers 2024-06-05 14:22:55 -06:00 committed by GitHub
parent 71718fba7d
commit bcafb1d24d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 145 additions and 7 deletions

View File

@ -24,6 +24,7 @@ import (
)
const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
const NoMetricsCreatedMsg = "No metrics were created from a message. Verify your parser settings. This message is only printed once."
var once sync.Once

View File

@ -23,6 +23,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
type empty struct{}
type semaphore chan empty
@ -441,6 +443,11 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
onError()
return err
}
if len(metrics) == 0 {
once.Do(func() {
a.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
id := acc.AddTrackingMetricGroup(metrics)
a.deliveries[id] = d

View File

@ -23,6 +23,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
type empty struct{}
type semaphore chan empty
@ -190,6 +192,11 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
if len(metrics) == 0 {
msg.Ack()
once.Do(func() {
ps.Log.Debug(internal.NoMetricsCreatedMsg)
})
return nil
}

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -21,6 +22,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
// defaultMaxBodySize is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 500 MB
@ -198,6 +201,12 @@ func (p *PubSubPush) serveWrite(res http.ResponseWriter, req *http.Request) {
return
}
if len(metrics) == 0 {
once.Do(func() {
p.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if p.AddMeta {
for i := range metrics {
for k, v := range payload.Msg.Atts {

View File

@ -21,6 +21,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
@ -30,6 +31,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
var (
defaultFilesToMonitor = []string{}
defaultFilesToIgnore = []string{}
@ -306,6 +309,12 @@ func (monitor *DirectoryMonitor) parseMetrics(parser telegraf.Parser, line []byt
return nil, err
}
if len(metrics) == 0 {
once.Do(func() {
monitor.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))

View File

@ -20,6 +20,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
const (
defaultMaxUndeliveredMessages = 1000
)
@ -268,6 +270,12 @@ func (e *EventHub) createMetrics(event *eventhubClient.Event) ([]telegraf.Metric
return nil, err
}
if len(metrics) == 0 {
once.Do(func() {
e.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
for i := range metrics {
for _, field := range e.ApplicationPropertyFields {
if val, ok := event.Get(field); ok {

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
@ -23,6 +24,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
const MaxStderrBytes int = 512
type exitcodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric
@ -118,6 +121,12 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync
return
}
if len(metrics) == 0 {
once.Do(func() {
e.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if e.exitcodeHandler != nil {
metrics = e.exitcodeHandler(metrics, runErr, errBuf)
}

View File

@ -9,10 +9,12 @@ import (
"io"
"os"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/inputs"
@ -22,6 +24,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
@ -102,6 +106,12 @@ func (e *Execd) cmdReadOut(out io.Reader) {
e.acc.AddError(fmt.Errorf("parse error: %w", err))
}
if len(metrics) == 0 {
once.Do(func() {
e.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
for _, metric := range metrics {
e.acc.AddMetric(metric)
}

View File

@ -7,10 +7,12 @@ import (
"io"
"os"
"path/filepath"
"sync"
"github.com/dimchansky/utfbom"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/common/encoding"
"github.com/influxdata/telegraf/plugins/inputs"
@ -19,11 +21,14 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
type File struct {
Files []string `toml:"files"`
FileTag string `toml:"file_tag"`
FilePathTag string `toml:"file_path_tag"`
CharacterEncoding string `toml:"character_encoding"`
Files []string `toml:"files"`
FileTag string `toml:"file_tag"`
FilePathTag string `toml:"file_path_tag"`
CharacterEncoding string `toml:"character_encoding"`
Log telegraf.Logger `toml:"-"`
parserFunc telegraf.ParserFunc
filenames []string
@ -108,6 +113,12 @@ func (f *File) readMetric(filename string) ([]telegraf.Metric, error) {
if err != nil {
return metrics, fmt.Errorf("could not parse %q: %w", filename, err)
}
if len(metrics) == 0 {
once.Do(func() {
f.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
return metrics, err
}

View File

@ -28,6 +28,7 @@ func TestRefreshFilePaths(t *testing.T) {
r := File{
Files: []string{filepath.Join(wd, "dev", "testfiles", "**.log")},
Log: testutil.Logger{},
}
err = r.Init()
require.NoError(t, err)
@ -45,6 +46,7 @@ func TestFileTag(t *testing.T) {
Files: []string{filepath.Join(wd, "dev", "testfiles", "json_a.log")},
FileTag: "filename",
FilePathTag: "filepath",
Log: testutil.Logger{},
}
require.NoError(t, r.Init())
@ -70,6 +72,7 @@ func TestJSONParserCompile(t *testing.T) {
wd, _ := os.Getwd()
r := File{
Files: []string{filepath.Join(wd, "dev", "testfiles", "json_a.log")},
Log: testutil.Logger{},
}
require.NoError(t, r.Init())
@ -89,6 +92,7 @@ func TestGrokParser(t *testing.T) {
var acc testutil.Accumulator
r := File{
Files: []string{filepath.Join(wd, "dev", "testfiles", "grok_a.log")},
Log: testutil.Logger{},
}
err := r.Init()
require.NoError(t, err)
@ -191,6 +195,7 @@ func TestCharacterEncoding(t *testing.T) {
plugin: &File{
Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "",
Log: testutil.Logger{},
},
csv: csv.Parser{
MetricName: "file",
@ -204,6 +209,7 @@ func TestCharacterEncoding(t *testing.T) {
plugin: &File{
Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "utf-8",
Log: testutil.Logger{},
},
csv: csv.Parser{
MetricName: "file",
@ -217,6 +223,7 @@ func TestCharacterEncoding(t *testing.T) {
plugin: &File{
Files: []string{"testdata/mtr-utf-16le.csv"},
CharacterEncoding: "utf-16le",
Log: testutil.Logger{},
},
csv: csv.Parser{
MetricName: "file",
@ -230,6 +237,7 @@ func TestCharacterEncoding(t *testing.T) {
plugin: &File{
Files: []string{"testdata/mtr-utf-16be.csv"},
CharacterEncoding: "utf-16be",
Log: testutil.Logger{},
},
csv: csv.Parser{
MetricName: "file",
@ -343,6 +351,7 @@ func TestStatefulParsers(t *testing.T) {
plugin: &File{
Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "",
Log: testutil.Logger{},
},
csv: csv.Parser{
MetricName: "file",
@ -390,6 +399,7 @@ func TestCSVBehavior(t *testing.T) {
// Setup the plugin
plugin := &File{
Files: []string{filepath.Join("testdata", "csv_behavior_input.csv")},
Log: testutil.Logger{},
}
plugin.SetParserFunc(parserFunc)
require.NoError(t, plugin.Init())

View File

@ -23,6 +23,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
type HTTP struct {
URLs []string `toml:"urls"`
Method string `toml:"method"`
@ -207,6 +209,12 @@ func (h *HTTP) gatherURL(acc telegraf.Accumulator, url string) error {
return fmt.Errorf("parsing metrics failed: %w", err)
}
if len(metrics) == 0 {
once.Do(func() {
h.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
for _, metric := range metrics {
if !metric.HasTag("url") {
metric.AddTag("url", url)

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
@ -27,6 +28,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
// defaultMaxBodySize is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
// 500 MB
@ -236,6 +239,12 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
return
}
if len(metrics) == 0 {
once.Do(func() {
h.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
for _, m := range metrics {
for headerName, measurementName := range h.HTTPHeaderTags {
headerValues := req.Header.Get(headerName)

View File

@ -23,6 +23,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
const (
defaultMaxUndeliveredMessages = 1000
defaultMaxProcessingTime = config.Duration(100 * time.Millisecond)
@ -490,6 +492,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
return err
}
if len(metrics) == 0 {
once.Do(func() {
h.log.Debug(internal.NoMetricsCreatedMsg)
})
}
headerKey := ""
// Check if any message header should override metric name or should be pass as tag
if len(h.MsgHeadersToTags) > 0 || h.MsgHeaderToMetricName != "" {

View File

@ -21,6 +21,7 @@ import (
"github.com/harlow/kinesis-consumer/store/ddb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/plugins/common/aws"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -28,6 +29,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
type (
DynamoDB struct {
AppName string `toml:"app_name"`
@ -180,6 +183,12 @@ func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consume
return err
}
if len(metrics) == 0 {
once.Do(func() {
k.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
k.recordsTex.Lock()
id := acc.AddTrackingMetricGroup(metrics)
k.records[id] = *r.SequenceNumber

View File

@ -275,8 +275,7 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
if err != nil || len(metrics) == 0 {
if len(metrics) == 0 {
once.Do(func() {
const msg = "No metrics were created from a message. Verify your parser settings. This message is only printed once."
m.Log.Debug(msg)
m.Log.Debug(internal.NoMetricsCreatedMsg)
})
}

View File

@ -11,6 +11,7 @@ import (
"github.com/nats-io/nats.go"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -18,6 +19,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
var (
defaultMaxUndeliveredMessages = 1000
)
@ -220,6 +223,11 @@ func (n *natsConsumer) receiver(ctx context.Context) {
<-sem
continue
}
if len(metrics) == 0 {
once.Do(func() {
n.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
for _, m := range metrics {
m.AddTag("subject", msg.Subject)
}

View File

@ -5,8 +5,10 @@ package socket_listener
import (
_ "embed"
"net"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/socket"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -14,6 +16,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
type SocketListener struct {
ServiceAddress string `toml:"service_address"`
Log telegraf.Logger `toml:"-"`
@ -54,6 +58,11 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
acc.AddError(err)
return
}
if len(metrics) == 0 {
once.Do(func() {
sl.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
for _, m := range metrics {
acc.AddMetric(m)
}

View File

@ -18,6 +18,7 @@ import (
"github.com/pborman/ansi"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/common/encoding"
"github.com/influxdata/telegraf/plugins/inputs"
@ -27,6 +28,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
const (
defaultWatchMethod = "inotify"
)
@ -340,7 +343,11 @@ func (t *Tail) receiver(parser telegraf.Parser, tailer *tail.Tail) {
tailer.Filename, text, err.Error())
continue
}
if len(metrics) == 0 {
once.Do(func() {
t.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if t.PathTag != "" {
for _, metric := range metrics {
metric.AddTag(t.PathTag, tailer.Filename)