From 9744c3a6a2e3cb5b98d38045c4cd4792e8f86a7b Mon Sep 17 00:00:00 2001 From: Sebastian Spaink <3441183+sspaink@users.noreply.github.com> Date: Wed, 6 Jul 2022 15:23:13 -0500 Subject: [PATCH] feat: Migrate influx and influx_upstream parsers to new style (#11432) --- plugins/common/shim/output.go | 5 +- plugins/common/shim/processor_test.go | 5 +- .../inputs/cloud_pubsub/cloud_pubsub_test.go | 17 ++-- .../cloud_pubsub_push_test.go | 8 +- plugins/inputs/execd/execd_test.go | 10 +-- .../http_listener_v2/http_listener_v2_test.go | 89 ++++++++++++------- .../influxdb_v2_listener.go | 17 +++- .../kafka_consumer_legacy_integration_test.go | 7 +- .../kafka_consumer_legacy_test.go | 23 ++--- .../mqtt_consumer/mqtt_consumer_test.go | 7 +- .../inputs/nsq_consumer/nsq_consumer_test.go | 5 +- .../socket_listener/socket_listener_test.go | 48 +++++----- plugins/inputs/tail/tail_test.go | 22 +++-- .../inputs/tcp_listener/tcp_listener_test.go | 57 ++++++------ .../outputs/cloud_pubsub/cloud_pubsub_test.go | 9 +- plugins/outputs/cloud_pubsub/topic_stubbed.go | 6 +- plugins/parsers/all/all.go | 2 + plugins/parsers/dropwizard/parser.go | 10 ++- .../parsers/influx/influx_upstream/parser.go | 45 ++++++---- .../influx/influx_upstream/parser_test.go | 14 ++- plugins/parsers/influx/parser.go | 51 +++++++---- plugins/parsers/influx/parser_test.go | 18 ++-- plugins/parsers/json_v2/parser_test.go | 7 +- plugins/parsers/registry.go | 17 ---- plugins/parsers/xpath/parser_test.go | 3 +- plugins/processors/starlark/starlark_test.go | 7 +- plugins/serializers/csv/csv_test.go | 6 +- 27 files changed, 304 insertions(+), 211 deletions(-) diff --git a/plugins/common/shim/output.go b/plugins/common/shim/output.go index c5ce46da7..2e9c9fc5f 100644 --- a/plugins/common/shim/output.go +++ b/plugins/common/shim/output.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" ) // AddOutput adds the input to the shim. Later calls to Run() will run this. @@ -23,7 +23,8 @@ func (s *Shim) AddOutput(output telegraf.Output) error { } func (s *Shim) RunOutput() error { - parser, err := parsers.NewInfluxParser() + parser := influx.Parser{} + err := parser.Init() if err != nil { return fmt.Errorf("Failed to create new parser: %w", err) } diff --git a/plugins/common/shim/processor_test.go b/plugins/common/shim/processor_test.go index b7b1739ae..9f6046e7b 100644 --- a/plugins/common/shim/processor_test.go +++ b/plugins/common/shim/processor_test.go @@ -12,7 +12,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -53,7 +53,8 @@ func testSendAndReceive(t *testing.T, fieldKey string, fieldValue string) { }() serializer, _ := serializers.NewInfluxSerializer() - parser, _ := parsers.NewInfluxParser() + parser := influx.Parser{} + require.NoError(t, parser.Init()) m := metric.New("thing", map[string]string{ diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go index e27c1e810..5c5edf455 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -19,7 +19,8 @@ const ( func TestRunParse(t *testing.T) { subID := "sub-run-parse" - testParser, _ := parsers.NewInfluxParser() + testParser := &influx.Parser{} + require.NoError(t, testParser.Init()) sub := &stubSub{ id: subID, @@ -63,7 +64,8 @@ func TestRunParse(t *testing.T) { func TestRunBase64(t *testing.T) { subID := "sub-run-base64" - testParser, _ := parsers.NewInfluxParser() + testParser := &influx.Parser{} + require.NoError(t, testParser.Init()) sub := &stubSub{ id: subID, @@ -107,7 +109,8 @@ func TestRunBase64(t *testing.T) { func TestRunInvalidMessages(t *testing.T) { subID := "sub-invalid-messages" - testParser, _ := parsers.NewInfluxParser() + testParser := &influx.Parser{} + require.NoError(t, testParser.Init()) sub := &stubSub{ id: subID, @@ -154,7 +157,8 @@ func TestRunOverlongMessages(t *testing.T) { acc := &testutil.Accumulator{} - testParser, _ := parsers.NewInfluxParser() + testParser := &influx.Parser{} + require.NoError(t, testParser.Init()) sub := &stubSub{ id: subID, @@ -201,7 +205,8 @@ func TestRunErrorInSubscriber(t *testing.T) { acc := &testutil.Accumulator{} - testParser, _ := parsers.NewInfluxParser() + testParser := &influx.Parser{} + require.NoError(t, testParser.Init()) sub := &stubSub{ id: subID, diff --git a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go index 3c922f2d5..34c314656 100644 --- a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go +++ b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go @@ -17,7 +17,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/models" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -135,10 +135,8 @@ func TestServeHTTP(t *testing.T) { pubPush.sem <- struct{}{} } - p, _ := parsers.NewParser(&parsers.Config{ - MetricName: "cloud_pubsub_push", - DataFormat: "influx", - }) + p := &influx.Parser{} + require.NoError(t, p.Init()) pubPush.SetParser(p) dst := make(chan telegraf.Metric, 1) diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 729db3785..500107e4d 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -16,7 +16,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" ) @@ -42,8 +42,8 @@ func TestSettingConfigWorks(t *testing.T) { } func TestExternalInputWorks(t *testing.T) { - influxParser, err := parsers.NewInfluxParser() - require.NoError(t, err) + influxParser := &influx.Parser{} + require.NoError(t, influxParser.Init()) exe, err := os.Executable() require.NoError(t, err) @@ -76,8 +76,8 @@ func TestExternalInputWorks(t *testing.T) { } func TestParsesLinesContainingNewline(t *testing.T) { - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) metrics := make(chan telegraf.Metric, 10) defer close(metrics) diff --git a/plugins/inputs/http_listener_v2/http_listener_v2_test.go b/plugins/inputs/http_listener_v2/http_listener_v2_test.go index fda803ef2..f08f2cb70 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2_test.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2_test.go @@ -17,8 +17,8 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -45,8 +45,11 @@ var ( pki = testutil.NewPKI("../../../testutil/pki") ) -func newTestHTTPListenerV2() *HTTPListenerV2 { - parser, _ := parsers.NewInfluxParser() +func newTestHTTPListenerV2() (*HTTPListenerV2, error) { + parser := &influx.Parser{} + if err := parser.Init(); err != nil { + return nil, err + } listener := &HTTPListenerV2{ Log: testutil.Logger{}, @@ -59,18 +62,24 @@ func newTestHTTPListenerV2() *HTTPListenerV2 { DataSource: "body", close: make(chan struct{}), } - return listener + return listener, nil } -func newTestHTTPAuthListener() *HTTPListenerV2 { - listener := newTestHTTPListenerV2() +func newTestHTTPAuthListener() (*HTTPListenerV2, error) { + listener, err := newTestHTTPListenerV2() + if err != nil { + return nil, err + } listener.BasicUsername = basicUsername listener.BasicPassword = basicPassword - return listener + return listener, nil } -func newTestHTTPSListenerV2() *HTTPListenerV2 { - parser, _ := parsers.NewInfluxParser() +func newTestHTTPSListenerV2() (*HTTPListenerV2, error) { + parser := &influx.Parser{} + if err := parser.Init(); err != nil { + return nil, err + } listener := &HTTPListenerV2{ Log: testutil.Logger{}, @@ -83,7 +92,7 @@ func newTestHTTPSListenerV2() *HTTPListenerV2 { close: make(chan struct{}), } - return listener + return listener, nil } func getHTTPSClient() *http.Client { @@ -109,7 +118,8 @@ func createURL(listener *HTTPListenerV2, scheme string, path string, rawquery st } func TestInvalidListenerConfig(t *testing.T) { - parser, _ := parsers.NewInfluxParser() + parser := &influx.Parser{} + require.NoError(t, parser.Init()) listener := &HTTPListenerV2{ Log: testutil.Logger{}, @@ -130,7 +140,8 @@ func TestInvalidListenerConfig(t *testing.T) { } func TestWriteHTTPSNoClientAuth(t *testing.T) { - listener := newTestHTTPSListenerV2() + listener, err := newTestHTTPSListenerV2() + require.NoError(t, err) listener.TLSAllowedCACerts = nil acc := &testutil.Accumulator{} @@ -156,7 +167,8 @@ func TestWriteHTTPSNoClientAuth(t *testing.T) { } func TestWriteHTTPSWithClientAuth(t *testing.T) { - listener := newTestHTTPSListenerV2() + listener, err := newTestHTTPSListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -171,7 +183,8 @@ func TestWriteHTTPSWithClientAuth(t *testing.T) { } func TestWriteHTTPBasicAuth(t *testing.T) { - listener := newTestHTTPAuthListener() + listener, err := newTestHTTPAuthListener() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -190,7 +203,8 @@ func TestWriteHTTPBasicAuth(t *testing.T) { } func TestWriteHTTP(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -240,7 +254,8 @@ func TestWriteHTTP(t *testing.T) { // http listener should add request path as configured path_tag func TestWriteHTTPWithPathTag(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) listener.PathTag = true acc := &testutil.Accumulator{} @@ -263,7 +278,8 @@ func TestWriteHTTPWithPathTag(t *testing.T) { // http listener should add request path as configured path_tag (trimming it before) func TestWriteHTTPWithMultiplePaths(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) listener.Paths = []string{"/alternative_write"} listener.PathTag = true @@ -298,7 +314,8 @@ func TestWriteHTTPWithMultiplePaths(t *testing.T) { // http listener should add a newline at the end of the buffer if it's not there func TestWriteHTTPNoNewline(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -319,7 +336,8 @@ func TestWriteHTTPNoNewline(t *testing.T) { } func TestWriteHTTPExactMaxBodySize(t *testing.T) { - parser, _ := parsers.NewInfluxParser() + parser := &influx.Parser{} + require.NoError(t, parser.Init()) listener := &HTTPListenerV2{ Log: testutil.Logger{}, @@ -344,7 +362,8 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) { } func TestWriteHTTPVerySmallMaxBody(t *testing.T) { - parser, _ := parsers.NewInfluxParser() + parser := &influx.Parser{} + require.NoError(t, parser.Init()) listener := &HTTPListenerV2{ Log: testutil.Logger{}, @@ -370,7 +389,8 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) { // test that writing gzipped data works func TestWriteHTTPGzippedData(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -404,7 +424,8 @@ func TestWriteHTTPGzippedData(t *testing.T) { // test that writing snappy data works func TestWriteHTTPSnappyData(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -443,7 +464,8 @@ func TestWriteHTTPHighTraffic(t *testing.T) { if runtime.GOOS == "darwin" { t.Skip("Skipping due to hang on darwin") } - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -479,7 +501,8 @@ func TestWriteHTTPHighTraffic(t *testing.T) { } func TestReceive404ForInvalidEndpoint(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -494,7 +517,8 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) { } func TestWriteHTTPInvalid(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -509,7 +533,8 @@ func TestWriteHTTPInvalid(t *testing.T) { } func TestWriteHTTPEmpty(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) acc := &testutil.Accumulator{} require.NoError(t, listener.Init()) @@ -524,7 +549,8 @@ func TestWriteHTTPEmpty(t *testing.T) { } func TestWriteHTTPTransformHeaderValuesToTagsSingleWrite(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"} acc := &testutil.Accumulator{} @@ -563,7 +589,8 @@ func TestWriteHTTPTransformHeaderValuesToTagsSingleWrite(t *testing.T) { } func TestWriteHTTPTransformHeaderValuesToTagsBulkWrite(t *testing.T) { - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "Present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"} acc := &testutil.Accumulator{} @@ -598,7 +625,8 @@ func TestWriteHTTPQueryParams(t *testing.T) { TagKeys: []string{"tagKey"}, } - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) listener.DataSource = "query" listener.Parser = &parser @@ -625,7 +653,8 @@ func TestWriteHTTPFormData(t *testing.T) { TagKeys: []string{"tagKey"}, } - listener := newTestHTTPListenerV2() + listener, err := newTestHTTPListenerV2() + require.NoError(t, err) listener.Parser = &parser acc := &testutil.Accumulator{} diff --git a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go index 33374c8df..fc8abf0df 100644 --- a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go +++ b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go @@ -249,7 +249,12 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc { var metrics []telegraf.Metric var err error if h.ParserType == "upstream" { - parser := influx_upstream.NewParser() + parser := influx_upstream.Parser{} + err = parser.Init() + if err != ErrEOF && err != nil { + h.Log.Debugf("Error initializing parser: %v", err.Error()) + return + } parser.SetTimeFunc(influx_upstream.TimeFunc(h.timeFunc)) if precisionStr != "" { @@ -259,13 +264,17 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc { metrics, err = parser.Parse(bytes) } else { - metricHandler := influx.NewMetricHandler() - parser := influx.NewParser(metricHandler) + parser := influx.Parser{} + err = parser.Init() + if err != ErrEOF && err != nil { + h.Log.Debugf("Error initializing parser: %v", err.Error()) + return + } parser.SetTimeFunc(h.timeFunc) if precisionStr != "" { precision := getPrecisionMultiplier(precisionStr) - metricHandler.SetTimePrecision(precision) + parser.SetTimePrecision(precision) } metrics, err = parser.Parse(bytes) diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go index 19befb1f6..1a74b2e8b 100644 --- a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go +++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go @@ -8,7 +8,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -43,8 +43,9 @@ func TestReadsMetricsFromKafkaIntegration(t *testing.T) { PointBuffer: 100000, Offset: "oldest", } - p, _ := parsers.NewInfluxParser() - k.SetParser(p) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + k.SetParser(parser) // Verify that we can now gather the sent message var acc testutil.Accumulator diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go index 50987fb32..740a9dced 100644 --- a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go +++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go @@ -6,8 +6,8 @@ import ( "github.com/Shopify/sarama" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/graphite" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/testutil" @@ -44,9 +44,10 @@ func TestRunParser(t *testing.T) { k.acc = &acc defer close(k.done) - var err error - k.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + k.parser = parser + go k.receiver() in <- saramaMsg(testMsg) acc.Wait(1) @@ -61,9 +62,10 @@ func TestRunParserInvalidMsg(t *testing.T) { k.acc = &acc defer close(k.done) - var err error - k.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + k.parser = parser + go k.receiver() in <- saramaMsg(invalidMsg) acc.WaitError(1) @@ -95,9 +97,10 @@ func TestRunParserAndGather(t *testing.T) { k.acc = &acc defer close(k.done) - var err error - k.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + k.parser = parser + go k.receiver() in <- saramaMsg(testMsg) acc.Wait(1) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 2eb7d6ffa..da8a376fe 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -8,6 +8,7 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -438,11 +439,11 @@ func TestTopicTag(t *testing.T) { plugin.TopicTag = tt.topicTag() plugin.TopicParsing = tt.topicParsing - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) plugin.SetParser(parser) - err = plugin.Init() + err := plugin.Init() require.Equal(t, tt.expectedError, err) if tt.expectedError != nil { return diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go index 4c6d94474..8f9be0800 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer_test.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -14,7 +14,7 @@ import ( "github.com/nsqio/go-nsq" "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -49,7 +49,8 @@ func TestReadsMetricsFromNSQ(t *testing.T) { Nsqd: []string{"127.0.0.1:4155"}, } - p, _ := parsers.NewInfluxParser() + p := &influx.Parser{} + require.NoError(t, p.Init()) consumer.SetParser(p) var acc testutil.Accumulator require.Len(t, acc.Metrics, 0, "There should not be any points") diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go index 72c87e3ed..a5a79468f 100644 --- a/plugins/inputs/socket_listener/socket_listener_test.go +++ b/plugins/inputs/socket_listener/socket_listener_test.go @@ -15,7 +15,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" "github.com/influxdata/wlog" ) @@ -50,15 +50,15 @@ func TestSocketListener_tcp_tls(t *testing.T) { defer testEmptyLog() sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "tcp://127.0.0.1:0" sl.ServerConfig = *pki.TLSServerConfig() acc := &testutil.Accumulator{} - err = sl.Start(acc) + err := sl.Start(acc) require.NoError(t, err) defer sl.Stop() @@ -75,15 +75,15 @@ func TestSocketListener_unix_tls(t *testing.T) { sock := testutil.TempSocket(t) sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "unix://" + sock sl.ServerConfig = *pki.TLSServerConfig() acc := &testutil.Accumulator{} - err = sl.Start(acc) + err := sl.Start(acc) require.NoError(t, err) defer sl.Stop() @@ -102,15 +102,15 @@ func TestSocketListener_tcp(t *testing.T) { defer testEmptyLog() sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "tcp://127.0.0.1:0" sl.ReadBufferSize = config.Size(1024) acc := &testutil.Accumulator{} - err = sl.Start(acc) + err := sl.Start(acc) require.NoError(t, err) defer sl.Stop() @@ -125,15 +125,15 @@ func TestSocketListener_udp(t *testing.T) { defer testEmptyLog() sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "udp://127.0.0.1:0" sl.ReadBufferSize = config.Size(1024) acc := &testutil.Accumulator{} - err = sl.Start(acc) + err := sl.Start(acc) require.NoError(t, err) defer sl.Stop() @@ -152,15 +152,15 @@ func TestSocketListener_unix(t *testing.T) { f, _ := os.Create(sock) require.NoError(t, f.Close()) sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "unix://" + sock sl.ReadBufferSize = config.Size(1024) acc := &testutil.Accumulator{} - err = sl.Start(acc) + err := sl.Start(acc) require.NoError(t, err) defer sl.Stop() @@ -185,8 +185,8 @@ func TestSocketListener_unixgram(t *testing.T) { t.Cleanup(func() { require.NoError(t, f.Close()) }) sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "unixgram://" + sock @@ -208,8 +208,8 @@ func TestSocketListenerDecode_tcp(t *testing.T) { defer testEmptyLog() sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "tcp://127.0.0.1:0" @@ -217,7 +217,7 @@ func TestSocketListenerDecode_tcp(t *testing.T) { sl.ContentEncoding = "gzip" acc := &testutil.Accumulator{} - err = sl.Start(acc) + err := sl.Start(acc) require.NoError(t, err) defer sl.Stop() @@ -232,8 +232,8 @@ func TestSocketListenerDecode_udp(t *testing.T) { defer testEmptyLog() sl := &SocketListener{} - parser, err := parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) sl.SetParser(parser) sl.Log = testutil.Logger{} sl.ServiceAddress = "udp://127.0.0.1:0" @@ -241,7 +241,7 @@ func TestSocketListenerDecode_udp(t *testing.T) { sl.ContentEncoding = "gzip" acc := &testutil.Accumulator{} - err = sl.Start(acc) + err := sl.Start(acc) require.NoError(t, err) defer sl.Stop() diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 44f11fab6..a66863da0 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -25,6 +25,15 @@ var ( testdataDir = getTestdataDir() ) +func NewInfluxParser() (parsers.Parser, error) { + parser := &influx.Parser{} + err := parser.Init() + if err != nil { + return nil, err + } + return parser, nil +} + func NewTestTail() *Tail { offsetsMutex.Lock() offsetsCopy := make(map[string]int64, len(offsets)) @@ -68,7 +77,7 @@ func TestTailBadLine(t *testing.T) { tt.Log = testutil.Logger{} tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(parsers.NewInfluxParser) + tt.SetParserFunc(NewInfluxParser) err = tt.Init() require.NoError(t, err) @@ -97,7 +106,7 @@ func TestColoredLine(t *testing.T) { tt.FromBeginning = true tt.Filters = []string{"ansi_color"} tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(parsers.NewInfluxParser) + tt.SetParserFunc(NewInfluxParser) err = tt.Init() require.NoError(t, err) @@ -130,7 +139,7 @@ func TestTailDosLineEndings(t *testing.T) { tt.Log = testutil.Logger{} tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(parsers.NewInfluxParser) + tt.SetParserFunc(NewInfluxParser) err = tt.Init() require.NoError(t, err) @@ -607,10 +616,7 @@ func TestCharacterEncoding(t *testing.T) { WatchMethod: watchMethod, } - plugin.SetParserFunc(func() (parsers.Parser, error) { - handler := influx.NewMetricHandler() - return influx.NewParser(handler), nil - }) + plugin.SetParserFunc(NewInfluxParser) if tt.offset != 0 { plugin.offsets = map[string]int64{ @@ -650,7 +656,7 @@ func TestTailEOF(t *testing.T) { tt.Log = testutil.Logger{} tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(parsers.NewInfluxParser) + tt.SetParserFunc(NewInfluxParser) err = tt.Init() require.NoError(t, err) diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go index a9f4707a1..ddd2dcabb 100644 --- a/plugins/inputs/tcp_listener/tcp_listener_test.go +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -8,8 +8,8 @@ import ( "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/graphite" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/testutil" ) @@ -47,9 +47,10 @@ func BenchmarkTCP(b *testing.B) { AllowedPendingMessages: 100000, MaxTCPConnections: 250, } - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(b, err) + parser := &influx.Parser{} + require.NoError(b, parser.Init()) + listener.parser = parser + acc := &testutil.Accumulator{Discard: true} // send multiple messages to socket @@ -79,9 +80,9 @@ func TestHighTrafficTCP(t *testing.T) { AllowedPendingMessages: 100000, MaxTCPConnections: 250, } - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + listener.parser = parser acc := &testutil.Accumulator{} // send multiple messages to socket @@ -109,9 +110,9 @@ func TestConnectTCP(t *testing.T) { AllowedPendingMessages: 10000, MaxTCPConnections: 250, } - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + listener.parser = parser acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) @@ -151,15 +152,15 @@ func TestConcurrentConns(t *testing.T) { AllowedPendingMessages: 10000, MaxTCPConnections: 2, } - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + listener.parser = parser acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) defer listener.Stop() - _, err = net.Dial("tcp", "127.0.0.1:8195") + _, err := net.Dial("tcp", "127.0.0.1:8195") require.NoError(t, err) _, err = net.Dial("tcp", "127.0.0.1:8195") require.NoError(t, err) @@ -190,15 +191,15 @@ func TestConcurrentConns1(t *testing.T) { AllowedPendingMessages: 10000, MaxTCPConnections: 1, } - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + listener.parser = parser acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) defer listener.Stop() - _, err = net.Dial("tcp", "127.0.0.1:8196") + _, err := net.Dial("tcp", "127.0.0.1:8196") require.NoError(t, err) // Connection over the limit: @@ -227,14 +228,14 @@ func TestCloseConcurrentConns(t *testing.T) { AllowedPendingMessages: 10000, MaxTCPConnections: 2, } - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + listener.parser = parser acc := &testutil.Accumulator{} require.NoError(t, listener.Start(acc)) - _, err = net.Dial("tcp", "127.0.0.1:8195") + _, err := net.Dial("tcp", "127.0.0.1:8195") require.NoError(t, err) _, err = net.Dial("tcp", "127.0.0.1:8195") require.NoError(t, err) @@ -250,9 +251,9 @@ func TestRunParser(t *testing.T) { listener.acc = &acc defer close(listener.done) - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + listener.parser = parser listener.wg.Add(1) go listener.tcpParser() @@ -273,9 +274,9 @@ func TestRunParserInvalidMsg(t *testing.T) { listener.Log = &testutil.CaptureLogger{} listener.acc = &testutil.Accumulator{} - var err error - listener.parser, err = parsers.NewInfluxParser() - require.NoError(t, err) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + listener.parser = parser listener.wg.Add(1) go listener.tcpParser() diff --git a/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go b/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go index e342acac4..ebe491efd 100644 --- a/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go +++ b/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -168,8 +168,11 @@ func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[str } func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message { - p, _ := parsers.NewInfluxParser() - + p := influx.Parser{} + err := p.Init() + if err != nil { + t.Fatalf("unexpected parsing error: %v", err) + } v, _ := m.GetField("value") psMsg, ok := published[v.(string)] if !ok { diff --git a/plugins/outputs/cloud_pubsub/topic_stubbed.go b/plugins/outputs/cloud_pubsub/topic_stubbed.go index c66e573a6..e8e24683c 100644 --- a/plugins/outputs/cloud_pubsub/topic_stubbed.go +++ b/plugins/outputs/cloud_pubsub/topic_stubbed.go @@ -15,7 +15,9 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/serializers" + "github.com/stretchr/testify/require" "google.golang.org/api/support/bundler" ) @@ -178,7 +180,9 @@ func (t *stubTopic) sendBundle() func(items interface{}) { } func (t *stubTopic) parseIDs(msg *pubsub.Message) []string { - p, _ := parsers.NewInfluxParser() + p := influx.Parser{} + err := p.Init() + require.NoError(t, err) metrics, err := p.Parse(msg.Data) if err != nil { // Just attempt to base64-decode first before returning error. diff --git a/plugins/parsers/all/all.go b/plugins/parsers/all/all.go index 8a108a3d8..145f8ac86 100644 --- a/plugins/parsers/all/all.go +++ b/plugins/parsers/all/all.go @@ -8,6 +8,8 @@ import ( _ "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded" _ "github.com/influxdata/telegraf/plugins/parsers/graphite" _ "github.com/influxdata/telegraf/plugins/parsers/grok" + _ "github.com/influxdata/telegraf/plugins/parsers/influx" + _ "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" _ "github.com/influxdata/telegraf/plugins/parsers/json" _ "github.com/influxdata/telegraf/plugins/parsers/json_v2" _ "github.com/influxdata/telegraf/plugins/parsers/logfmt" diff --git a/plugins/parsers/dropwizard/parser.go b/plugins/parsers/dropwizard/parser.go index d4f50f33f..f30240579 100644 --- a/plugins/parsers/dropwizard/parser.go +++ b/plugins/parsers/dropwizard/parser.go @@ -206,8 +206,14 @@ func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []te } func (p *Parser) Init() error { - handler := influx.NewMetricHandler() - p.seriesParser = influx.NewSeriesParser(handler) + parser := &influx.Parser{ + Type: "series", + } + err := parser.Init() + if err != nil { + return err + } + p.seriesParser = parser if len(p.Templates) != 0 { defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*") diff --git a/plugins/parsers/influx/influx_upstream/parser.go b/plugins/parsers/influx/influx_upstream/parser.go index 153407a0e..ca221e547 100644 --- a/plugins/parsers/influx/influx_upstream/parser.go +++ b/plugins/parsers/influx/influx_upstream/parser.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" ) const ( @@ -101,30 +102,15 @@ func convertToParseError(input []byte, rawErr error) error { // Parser is an InfluxDB Line Protocol parser that implements the // parsers.Parser interface. type Parser struct { - DefaultTags map[string]string + DefaultTags map[string]string `toml:"-"` + // If set to "series" a series machine will be initialized, defaults to regular machine + Type string `toml:"-"` defaultTime TimeFunc precision lineprotocol.Precision allowPartial bool } -// NewParser returns a Parser than accepts line protocol -func NewParser() *Parser { - return &Parser{ - defaultTime: time.Now, - precision: lineprotocol.Nanosecond, - } -} - -// NewSeriesParser returns a Parser than accepts a measurement and tagset -func NewSeriesParser() *Parser { - return &Parser{ - defaultTime: time.Now, - precision: lineprotocol.Nanosecond, - allowPartial: true, - } -} - func (p *Parser) SetTimeFunc(f TimeFunc) { p.defaultTime = f } @@ -193,6 +179,29 @@ func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) { } } +func (p *Parser) Init() error { + p.defaultTime = time.Now + p.precision = lineprotocol.Nanosecond + p.allowPartial = p.Type == "series" + + return nil +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (p *Parser) InitFromConfig(config *parsers.Config) error { + p.DefaultTags = config.DefaultTags + + return p.Init() +} + +func init() { + parsers.Add("influx_upstream", + func(_ string) telegraf.Parser { + return &Parser{} + }, + ) +} + // StreamParser is an InfluxDB Line Protocol parser. It is not safe for // concurrent use in multiple goroutines. type StreamParser struct { diff --git a/plugins/parsers/influx/influx_upstream/parser_test.go b/plugins/parsers/influx/influx_upstream/parser_test.go index 686aceb22..dfcbe4d49 100644 --- a/plugins/parsers/influx/influx_upstream/parser_test.go +++ b/plugins/parsers/influx/influx_upstream/parser_test.go @@ -594,7 +594,8 @@ func parseTests(stream bool) []parseTest { func TestParser(t *testing.T) { for _, tt := range parseTests(false) { t.Run(tt.name, func(t *testing.T) { - parser := NewParser() + parser := Parser{} + require.NoError(t, parser.Init()) parser.SetTimeFunc(DefaultTime) if tt.timeFunc != nil { parser.SetTimeFunc(tt.timeFunc) @@ -621,7 +622,8 @@ func TestParser(t *testing.T) { func BenchmarkParser(b *testing.B) { for _, tt := range parseTests(false) { b.Run(tt.name, func(b *testing.B) { - parser := NewParser() + parser := Parser{} + require.NoError(b, parser.Init()) for n := 0; n < b.N; n++ { metrics, err := parser.Parse(tt.input) _ = err @@ -728,7 +730,10 @@ func TestSeriesParser(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - parser := NewSeriesParser() + parser := Parser{ + Type: "series", + } + require.NoError(t, parser.Init()) if tt.timeFunc != nil { parser.SetTimeFunc(tt.timeFunc) } @@ -778,7 +783,8 @@ func TestParserErrorString(t *testing.T) { for _, tt := range ptests { t.Run(tt.name, func(t *testing.T) { - parser := NewParser() + parser := Parser{} + require.NoError(t, parser.Init()) _, err := parser.Parse(tt.input) require.Equal(t, tt.errString, err.Error()) diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index adc89f407..c1afc0ac2 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" ) const ( @@ -59,33 +60,23 @@ func (e *ParseError) Error() string { // Parser is an InfluxDB Line Protocol parser that implements the // parsers.Parser interface. type Parser struct { - DefaultTags map[string]string + DefaultTags map[string]string `toml:"-"` + // If set to "series" a series machine will be initialized, defaults to regular machine + Type string `toml:"-"` sync.Mutex *machine handler *MetricHandler } -// NewParser returns a Parser than accepts line protocol -func NewParser(handler *MetricHandler) *Parser { - return &Parser{ - machine: NewMachine(handler), - handler: handler, - } -} - -// NewSeriesParser returns a Parser than accepts a measurement and tagset -func NewSeriesParser(handler *MetricHandler) *Parser { - return &Parser{ - machine: NewSeriesMachine(handler), - handler: handler, - } -} - func (p *Parser) SetTimeFunc(f TimeFunc) { p.handler.SetTimeFunc(f) } +func (p *Parser) SetTimePrecision(u time.Duration) { + p.handler.SetTimePrecision(u) +} + func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { p.Lock() defer p.Unlock() @@ -160,6 +151,32 @@ func (p *Parser) applyDefaultTagsSingle(metric telegraf.Metric) { } } +func (p *Parser) Init() error { + p.handler = NewMetricHandler() + if p.Type == "series" { + p.machine = NewSeriesMachine(p.handler) + } else { + p.machine = NewMachine(p.handler) + } + + return nil +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (p *Parser) InitFromConfig(config *parsers.Config) error { + p.DefaultTags = config.DefaultTags + + return p.Init() +} + +func init() { + parsers.Add("influx", + func(_ string) telegraf.Parser { + return &Parser{} + }, + ) +} + // StreamParser is an InfluxDB Line Protocol parser. It is not safe for // concurrent use in multiple goroutines. type StreamParser struct { diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index 422736b38..3128d7888 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -568,8 +568,8 @@ var ptests = []struct { func TestParser(t *testing.T) { for _, tt := range ptests { t.Run(tt.name, func(t *testing.T) { - handler := NewMetricHandler() - parser := NewParser(handler) + parser := Parser{} + require.NoError(t, parser.Init()) parser.SetTimeFunc(DefaultTime) if tt.timeFunc != nil { parser.SetTimeFunc(tt.timeFunc) @@ -592,8 +592,8 @@ func TestParser(t *testing.T) { func BenchmarkParser(b *testing.B) { for _, tt := range ptests { b.Run(tt.name, func(b *testing.B) { - handler := NewMetricHandler() - parser := NewParser(handler) + parser := Parser{} + require.NoError(b, parser.Init()) for n := 0; n < b.N; n++ { metrics, err := parser.Parse(tt.input) _ = err @@ -698,8 +698,10 @@ func TestSeriesParser(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - handler := NewMetricHandler() - parser := NewSeriesParser(handler) + parser := Parser{ + Type: "series", + } + require.NoError(t, parser.Init()) if tt.timeFunc != nil { parser.SetTimeFunc(tt.timeFunc) } @@ -749,8 +751,8 @@ func TestParserErrorString(t *testing.T) { for _, tt := range ptests { t.Run(tt.name, func(t *testing.T) { - handler := NewMetricHandler() - parser := NewParser(handler) + parser := Parser{} + require.NoError(t, parser.Init()) _, err := parser.Parse(tt.input) require.Equal(t, tt.errString, err.Error()) diff --git a/plugins/parsers/json_v2/parser_test.go b/plugins/parsers/json_v2/parser_test.go index 51780bcd6..f7b142b2a 100644 --- a/plugins/parsers/json_v2/parser_test.go +++ b/plugins/parsers/json_v2/parser_test.go @@ -68,7 +68,7 @@ func TestMultipleConfigs(t *testing.T) { } // Process expected metrics and compare with resulting metrics - expectedOutputs, err := readMetricFile(fmt.Sprintf("testdata/%s/expected.out", f.Name())) + expectedOutputs, err := readMetricFile(t, fmt.Sprintf("testdata/%s/expected.out", f.Name())) require.NoError(t, err) resultingMetrics := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expectedOutputs, resultingMetrics, testutil.IgnoreTime()) @@ -86,7 +86,7 @@ func TestMultipleConfigs(t *testing.T) { } } -func readMetricFile(path string) ([]telegraf.Metric, error) { +func readMetricFile(t *testing.T, path string) ([]telegraf.Metric, error) { var metrics []telegraf.Metric expectedFile, err := os.Open(path) if err != nil { @@ -94,7 +94,8 @@ func readMetricFile(path string) ([]telegraf.Metric, error) { } defer expectedFile.Close() - parser := influx.NewParser(influx.NewMetricHandler()) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) scanner := bufio.NewScanner(expectedFile) for scanner.Scan() { line := scanner.Text() diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 21df33711..04437f278 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -4,8 +4,6 @@ import ( "fmt" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" "github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite" "github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2" "github.com/influxdata/telegraf/plugins/parsers/temporary/xpath" @@ -195,12 +193,6 @@ func NewParser(config *Config) (Parser, error) { var err error var parser Parser switch config.DataFormat { - case "influx": - if config.InfluxParserType == "upstream" { - parser, err = NewInfluxUpstreamParser() - } else { - parser, err = NewInfluxParser() - } case "prometheusremotewrite": parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags) default: @@ -221,15 +213,6 @@ func NewParser(config *Config) (Parser, error) { return parser, err } -func NewInfluxParser() (Parser, error) { - handler := influx.NewMetricHandler() - return influx.NewParser(handler), nil -} - -func NewInfluxUpstreamParser() (Parser, error) { - return influx_upstream.NewParser(), nil -} - func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) { return &prometheusremotewrite.Parser{ DefaultTags: defaultTags, diff --git a/plugins/parsers/xpath/parser_test.go b/plugins/parsers/xpath/parser_test.go index 94936a065..9b1e7a163 100644 --- a/plugins/parsers/xpath/parser_test.go +++ b/plugins/parsers/xpath/parser_test.go @@ -1302,7 +1302,8 @@ func TestTestCases(t *testing.T) { }, } - parser := influx.NewParser(influx.NewMetricHandler()) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index df3e53f6c..15e74cd9a 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -17,7 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" common "github.com/influxdata/telegraf/plugins/common/starlark" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -3258,10 +3258,11 @@ func TestAllScriptTestData(t *testing.T) { } } -var parser, _ = parsers.NewInfluxParser() // literally never returns errors. - // parses metric lines out of line protocol following a header, with a trailing blank line func parseMetricsFrom(t *testing.T, lines []string, header string) (metrics []telegraf.Metric) { + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + require.NotZero(t, len(lines), "Expected some lines to parse from .star file, found none") startIdx := -1 endIdx := len(lines) diff --git a/plugins/serializers/csv/csv_test.go b/plugins/serializers/csv/csv_test.go index f0e10ba5a..fa27a2859 100644 --- a/plugins/serializers/csv/csv_test.go +++ b/plugins/serializers/csv/csv_test.go @@ -60,7 +60,8 @@ func TestSerializeTransformationNonBatch(t *testing.T) { filename: "testcases/semicolon.conf", }, } - parser := influx.NewParser(influx.NewMetricHandler()) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -125,7 +126,8 @@ func TestSerializeTransformationBatch(t *testing.T) { filename: "testcases/semicolon.conf", }, } - parser := influx.NewParser(influx.NewMetricHandler()) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {