feat: Migrate influx and influx_upstream parsers to new style (#11432)

This commit is contained in:
Sebastian Spaink 2022-07-06 15:23:13 -05:00 committed by GitHub
parent da5fdad292
commit 9744c3a6a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 304 additions and 211 deletions

View File

@ -5,7 +5,7 @@ import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
)
// AddOutput adds the input to the shim. Later calls to Run() will run this.
@ -23,7 +23,8 @@ func (s *Shim) AddOutput(output telegraf.Output) error {
}
func (s *Shim) RunOutput() error {
parser, err := parsers.NewInfluxParser()
parser := influx.Parser{}
err := parser.Init()
if err != nil {
return fmt.Errorf("Failed to create new parser: %w", err)
}

View File

@ -12,7 +12,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
)
@ -53,7 +53,8 @@ func testSendAndReceive(t *testing.T, fieldKey string, fieldValue string) {
}()
serializer, _ := serializers.NewInfluxSerializer()
parser, _ := parsers.NewInfluxParser()
parser := influx.Parser{}
require.NoError(t, parser.Init())
m := metric.New("thing",
map[string]string{

View File

@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -19,7 +19,8 @@ const (
func TestRunParse(t *testing.T) {
subID := "sub-run-parse"
testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
@ -63,7 +64,8 @@ func TestRunParse(t *testing.T) {
func TestRunBase64(t *testing.T) {
subID := "sub-run-base64"
testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
@ -107,7 +109,8 @@ func TestRunBase64(t *testing.T) {
func TestRunInvalidMessages(t *testing.T) {
subID := "sub-invalid-messages"
testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
@ -154,7 +157,8 @@ func TestRunOverlongMessages(t *testing.T) {
acc := &testutil.Accumulator{}
testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
@ -201,7 +205,8 @@ func TestRunErrorInSubscriber(t *testing.T) {
acc := &testutil.Accumulator{}
testParser, _ := parsers.NewInfluxParser()
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,

View File

@ -17,7 +17,7 @@ import (
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -135,10 +135,8 @@ func TestServeHTTP(t *testing.T) {
pubPush.sem <- struct{}{}
}
p, _ := parsers.NewParser(&parsers.Config{
MetricName: "cloud_pubsub_push",
DataFormat: "influx",
})
p := &influx.Parser{}
require.NoError(t, p.Init())
pubPush.SetParser(p)
dst := make(chan telegraf.Metric, 1)

View File

@ -16,7 +16,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
@ -42,8 +42,8 @@ func TestSettingConfigWorks(t *testing.T) {
}
func TestExternalInputWorks(t *testing.T) {
influxParser, err := parsers.NewInfluxParser()
require.NoError(t, err)
influxParser := &influx.Parser{}
require.NoError(t, influxParser.Init())
exe, err := os.Executable()
require.NoError(t, err)
@ -76,8 +76,8 @@ func TestExternalInputWorks(t *testing.T) {
}
func TestParsesLinesContainingNewline(t *testing.T) {
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
metrics := make(chan telegraf.Metric, 10)
defer close(metrics)

View File

@ -17,8 +17,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -45,8 +45,11 @@ var (
pki = testutil.NewPKI("../../../testutil/pki")
)
func newTestHTTPListenerV2() *HTTPListenerV2 {
parser, _ := parsers.NewInfluxParser()
func newTestHTTPListenerV2() (*HTTPListenerV2, error) {
parser := &influx.Parser{}
if err := parser.Init(); err != nil {
return nil, err
}
listener := &HTTPListenerV2{
Log: testutil.Logger{},
@ -59,18 +62,24 @@ func newTestHTTPListenerV2() *HTTPListenerV2 {
DataSource: "body",
close: make(chan struct{}),
}
return listener
return listener, nil
}
func newTestHTTPAuthListener() *HTTPListenerV2 {
listener := newTestHTTPListenerV2()
func newTestHTTPAuthListener() (*HTTPListenerV2, error) {
listener, err := newTestHTTPListenerV2()
if err != nil {
return nil, err
}
listener.BasicUsername = basicUsername
listener.BasicPassword = basicPassword
return listener
return listener, nil
}
func newTestHTTPSListenerV2() *HTTPListenerV2 {
parser, _ := parsers.NewInfluxParser()
func newTestHTTPSListenerV2() (*HTTPListenerV2, error) {
parser := &influx.Parser{}
if err := parser.Init(); err != nil {
return nil, err
}
listener := &HTTPListenerV2{
Log: testutil.Logger{},
@ -83,7 +92,7 @@ func newTestHTTPSListenerV2() *HTTPListenerV2 {
close: make(chan struct{}),
}
return listener
return listener, nil
}
func getHTTPSClient() *http.Client {
@ -109,7 +118,8 @@ func createURL(listener *HTTPListenerV2, scheme string, path string, rawquery st
}
func TestInvalidListenerConfig(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener := &HTTPListenerV2{
Log: testutil.Logger{},
@ -130,7 +140,8 @@ func TestInvalidListenerConfig(t *testing.T) {
}
func TestWriteHTTPSNoClientAuth(t *testing.T) {
listener := newTestHTTPSListenerV2()
listener, err := newTestHTTPSListenerV2()
require.NoError(t, err)
listener.TLSAllowedCACerts = nil
acc := &testutil.Accumulator{}
@ -156,7 +167,8 @@ func TestWriteHTTPSNoClientAuth(t *testing.T) {
}
func TestWriteHTTPSWithClientAuth(t *testing.T) {
listener := newTestHTTPSListenerV2()
listener, err := newTestHTTPSListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -171,7 +183,8 @@ func TestWriteHTTPSWithClientAuth(t *testing.T) {
}
func TestWriteHTTPBasicAuth(t *testing.T) {
listener := newTestHTTPAuthListener()
listener, err := newTestHTTPAuthListener()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -190,7 +203,8 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
}
func TestWriteHTTP(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -240,7 +254,8 @@ func TestWriteHTTP(t *testing.T) {
// http listener should add request path as configured path_tag
func TestWriteHTTPWithPathTag(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.PathTag = true
acc := &testutil.Accumulator{}
@ -263,7 +278,8 @@ func TestWriteHTTPWithPathTag(t *testing.T) {
// http listener should add request path as configured path_tag (trimming it before)
func TestWriteHTTPWithMultiplePaths(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.Paths = []string{"/alternative_write"}
listener.PathTag = true
@ -298,7 +314,8 @@ func TestWriteHTTPWithMultiplePaths(t *testing.T) {
// http listener should add a newline at the end of the buffer if it's not there
func TestWriteHTTPNoNewline(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -319,7 +336,8 @@ func TestWriteHTTPNoNewline(t *testing.T) {
}
func TestWriteHTTPExactMaxBodySize(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener := &HTTPListenerV2{
Log: testutil.Logger{},
@ -344,7 +362,8 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
}
func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
parser, _ := parsers.NewInfluxParser()
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener := &HTTPListenerV2{
Log: testutil.Logger{},
@ -370,7 +389,8 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
// test that writing gzipped data works
func TestWriteHTTPGzippedData(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -404,7 +424,8 @@ func TestWriteHTTPGzippedData(t *testing.T) {
// test that writing snappy data works
func TestWriteHTTPSnappyData(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -443,7 +464,8 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Skipping due to hang on darwin")
}
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -479,7 +501,8 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
}
func TestReceive404ForInvalidEndpoint(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -494,7 +517,8 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
}
func TestWriteHTTPInvalid(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -509,7 +533,8 @@ func TestWriteHTTPInvalid(t *testing.T) {
}
func TestWriteHTTPEmpty(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
@ -524,7 +549,8 @@ func TestWriteHTTPEmpty(t *testing.T) {
}
func TestWriteHTTPTransformHeaderValuesToTagsSingleWrite(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"}
acc := &testutil.Accumulator{}
@ -563,7 +589,8 @@ func TestWriteHTTPTransformHeaderValuesToTagsSingleWrite(t *testing.T) {
}
func TestWriteHTTPTransformHeaderValuesToTagsBulkWrite(t *testing.T) {
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.HTTPHeaderTags = map[string]string{"Present_http_header_1": "presentMeasurementKey1", "Present_http_header_2": "presentMeasurementKey2", "NOT_PRESENT_HEADER": "notPresentMeasurementKey"}
acc := &testutil.Accumulator{}
@ -598,7 +625,8 @@ func TestWriteHTTPQueryParams(t *testing.T) {
TagKeys: []string{"tagKey"},
}
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.DataSource = "query"
listener.Parser = &parser
@ -625,7 +653,8 @@ func TestWriteHTTPFormData(t *testing.T) {
TagKeys: []string{"tagKey"},
}
listener := newTestHTTPListenerV2()
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.Parser = &parser
acc := &testutil.Accumulator{}

View File

@ -249,7 +249,12 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
var metrics []telegraf.Metric
var err error
if h.ParserType == "upstream" {
parser := influx_upstream.NewParser()
parser := influx_upstream.Parser{}
err = parser.Init()
if err != ErrEOF && err != nil {
h.Log.Debugf("Error initializing parser: %v", err.Error())
return
}
parser.SetTimeFunc(influx_upstream.TimeFunc(h.timeFunc))
if precisionStr != "" {
@ -259,13 +264,17 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
metrics, err = parser.Parse(bytes)
} else {
metricHandler := influx.NewMetricHandler()
parser := influx.NewParser(metricHandler)
parser := influx.Parser{}
err = parser.Init()
if err != ErrEOF && err != nil {
h.Log.Debugf("Error initializing parser: %v", err.Error())
return
}
parser.SetTimeFunc(h.timeFunc)
if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
metricHandler.SetTimePrecision(precision)
parser.SetTimePrecision(precision)
}
metrics, err = parser.Parse(bytes)

View File

@ -8,7 +8,7 @@ import (
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -43,8 +43,9 @@ func TestReadsMetricsFromKafkaIntegration(t *testing.T) {
PointBuffer: 100000,
Offset: "oldest",
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.SetParser(parser)
// Verify that we can now gather the sent message
var acc testutil.Accumulator

View File

@ -6,8 +6,8 @@ import (
"github.com/Shopify/sarama"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
@ -44,9 +44,10 @@ func TestRunParser(t *testing.T) {
k.acc = &acc
defer close(k.done)
var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.parser = parser
go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)
@ -61,9 +62,10 @@ func TestRunParserInvalidMsg(t *testing.T) {
k.acc = &acc
defer close(k.done)
var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.parser = parser
go k.receiver()
in <- saramaMsg(invalidMsg)
acc.WaitError(1)
@ -95,9 +97,10 @@ func TestRunParserAndGather(t *testing.T) {
k.acc = &acc
defer close(k.done)
var err error
k.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
k.parser = parser
go k.receiver()
in <- saramaMsg(testMsg)
acc.Wait(1)

View File

@ -8,6 +8,7 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
@ -438,11 +439,11 @@ func TestTopicTag(t *testing.T) {
plugin.TopicTag = tt.topicTag()
plugin.TopicParsing = tt.topicParsing
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
err = plugin.Init()
err := plugin.Init()
require.Equal(t, tt.expectedError, err)
if tt.expectedError != nil {
return

View File

@ -14,7 +14,7 @@ import (
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -49,7 +49,8 @@ func TestReadsMetricsFromNSQ(t *testing.T) {
Nsqd: []string{"127.0.0.1:4155"},
}
p, _ := parsers.NewInfluxParser()
p := &influx.Parser{}
require.NoError(t, p.Init())
consumer.SetParser(p)
var acc testutil.Accumulator
require.Len(t, acc.Metrics, 0, "There should not be any points")

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/wlog"
)
@ -50,15 +50,15 @@ func TestSocketListener_tcp_tls(t *testing.T) {
defer testEmptyLog()
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ServerConfig = *pki.TLSServerConfig()
acc := &testutil.Accumulator{}
err = sl.Start(acc)
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
@ -75,15 +75,15 @@ func TestSocketListener_unix_tls(t *testing.T) {
sock := testutil.TempSocket(t)
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unix://" + sock
sl.ServerConfig = *pki.TLSServerConfig()
acc := &testutil.Accumulator{}
err = sl.Start(acc)
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
@ -102,15 +102,15 @@ func TestSocketListener_tcp(t *testing.T) {
defer testEmptyLog()
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
sl.ReadBufferSize = config.Size(1024)
acc := &testutil.Accumulator{}
err = sl.Start(acc)
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
@ -125,15 +125,15 @@ func TestSocketListener_udp(t *testing.T) {
defer testEmptyLog()
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = config.Size(1024)
acc := &testutil.Accumulator{}
err = sl.Start(acc)
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
@ -152,15 +152,15 @@ func TestSocketListener_unix(t *testing.T) {
f, _ := os.Create(sock)
require.NoError(t, f.Close())
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unix://" + sock
sl.ReadBufferSize = config.Size(1024)
acc := &testutil.Accumulator{}
err = sl.Start(acc)
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
@ -185,8 +185,8 @@ func TestSocketListener_unixgram(t *testing.T) {
t.Cleanup(func() { require.NoError(t, f.Close()) })
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "unixgram://" + sock
@ -208,8 +208,8 @@ func TestSocketListenerDecode_tcp(t *testing.T) {
defer testEmptyLog()
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:0"
@ -217,7 +217,7 @@ func TestSocketListenerDecode_tcp(t *testing.T) {
sl.ContentEncoding = "gzip"
acc := &testutil.Accumulator{}
err = sl.Start(acc)
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
@ -232,8 +232,8 @@ func TestSocketListenerDecode_udp(t *testing.T) {
defer testEmptyLog()
sl := &SocketListener{}
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
sl.SetParser(parser)
sl.Log = testutil.Logger{}
sl.ServiceAddress = "udp://127.0.0.1:0"
@ -241,7 +241,7 @@ func TestSocketListenerDecode_udp(t *testing.T) {
sl.ContentEncoding = "gzip"
acc := &testutil.Accumulator{}
err = sl.Start(acc)
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()

View File

@ -25,6 +25,15 @@ var (
testdataDir = getTestdataDir()
)
func NewInfluxParser() (parsers.Parser, error) {
parser := &influx.Parser{}
err := parser.Init()
if err != nil {
return nil, err
}
return parser, nil
}
func NewTestTail() *Tail {
offsetsMutex.Lock()
offsetsCopy := make(map[string]int64, len(offsets))
@ -68,7 +77,7 @@ func TestTailBadLine(t *testing.T) {
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParserFunc(NewInfluxParser)
err = tt.Init()
require.NoError(t, err)
@ -97,7 +106,7 @@ func TestColoredLine(t *testing.T) {
tt.FromBeginning = true
tt.Filters = []string{"ansi_color"}
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParserFunc(NewInfluxParser)
err = tt.Init()
require.NoError(t, err)
@ -130,7 +139,7 @@ func TestTailDosLineEndings(t *testing.T) {
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParserFunc(NewInfluxParser)
err = tt.Init()
require.NoError(t, err)
@ -607,10 +616,7 @@ func TestCharacterEncoding(t *testing.T) {
WatchMethod: watchMethod,
}
plugin.SetParserFunc(func() (parsers.Parser, error) {
handler := influx.NewMetricHandler()
return influx.NewParser(handler), nil
})
plugin.SetParserFunc(NewInfluxParser)
if tt.offset != 0 {
plugin.offsets = map[string]int64{
@ -650,7 +656,7 @@ func TestTailEOF(t *testing.T) {
tt.Log = testutil.Logger{}
tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(parsers.NewInfluxParser)
tt.SetParserFunc(NewInfluxParser)
err = tt.Init()
require.NoError(t, err)

View File

@ -8,8 +8,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -47,9 +47,10 @@ func BenchmarkTCP(b *testing.B) {
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(b, err)
parser := &influx.Parser{}
require.NoError(b, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{Discard: true}
// send multiple messages to socket
@ -79,9 +80,9 @@ func TestHighTrafficTCP(t *testing.T) {
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
// send multiple messages to socket
@ -109,9 +110,9 @@ func TestConnectTCP(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
}
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
@ -151,15 +152,15 @@ func TestConcurrentConns(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
_, err = net.Dial("tcp", "127.0.0.1:8195")
_, err := net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
@ -190,15 +191,15 @@ func TestConcurrentConns1(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 1,
}
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
_, err = net.Dial("tcp", "127.0.0.1:8196")
_, err := net.Dial("tcp", "127.0.0.1:8196")
require.NoError(t, err)
// Connection over the limit:
@ -227,14 +228,14 @@ func TestCloseConcurrentConns(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
_, err = net.Dial("tcp", "127.0.0.1:8195")
_, err := net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
@ -250,9 +251,9 @@ func TestRunParser(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
listener.wg.Add(1)
go listener.tcpParser()
@ -273,9 +274,9 @@ func TestRunParserInvalidMsg(t *testing.T) {
listener.Log = &testutil.CaptureLogger{}
listener.acc = &testutil.Accumulator{}
var err error
listener.parser, err = parsers.NewInfluxParser()
require.NoError(t, err)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
listener.wg.Add(1)
go listener.tcpParser()

View File

@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -168,8 +168,11 @@ func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[str
}
func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message {
p, _ := parsers.NewInfluxParser()
p := influx.Parser{}
err := p.Init()
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
}
v, _ := m.GetField("value")
psMsg, ok := published[v.(string)]
if !ok {

View File

@ -15,7 +15,9 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/stretchr/testify/require"
"google.golang.org/api/support/bundler"
)
@ -178,7 +180,9 @@ func (t *stubTopic) sendBundle() func(items interface{}) {
}
func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p, _ := parsers.NewInfluxParser()
p := influx.Parser{}
err := p.Init()
require.NoError(t, err)
metrics, err := p.Parse(msg.Data)
if err != nil {
// Just attempt to base64-decode first before returning error.

View File

@ -8,6 +8,8 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
_ "github.com/influxdata/telegraf/plugins/parsers/graphite"
_ "github.com/influxdata/telegraf/plugins/parsers/grok"
_ "github.com/influxdata/telegraf/plugins/parsers/influx"
_ "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
_ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
_ "github.com/influxdata/telegraf/plugins/parsers/logfmt"

View File

@ -206,8 +206,14 @@ func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []te
}
func (p *Parser) Init() error {
handler := influx.NewMetricHandler()
p.seriesParser = influx.NewSeriesParser(handler)
parser := &influx.Parser{
Type: "series",
}
err := parser.Init()
if err != nil {
return err
}
p.seriesParser = parser
if len(p.Templates) != 0 {
defaultTemplate, err := templating.NewDefaultTemplateWithPattern("measurement*")

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
const (
@ -101,30 +102,15 @@ func convertToParseError(input []byte, rawErr error) error {
// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct {
DefaultTags map[string]string
DefaultTags map[string]string `toml:"-"`
// If set to "series" a series machine will be initialized, defaults to regular machine
Type string `toml:"-"`
defaultTime TimeFunc
precision lineprotocol.Precision
allowPartial bool
}
// NewParser returns a Parser than accepts line protocol
func NewParser() *Parser {
return &Parser{
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
}
}
// NewSeriesParser returns a Parser than accepts a measurement and tagset
func NewSeriesParser() *Parser {
return &Parser{
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
allowPartial: true,
}
}
func (p *Parser) SetTimeFunc(f TimeFunc) {
p.defaultTime = f
}
@ -193,6 +179,29 @@ func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) {
}
}
func (p *Parser) Init() error {
p.defaultTime = time.Now
p.precision = lineprotocol.Nanosecond
p.allowPartial = p.Type == "series"
return nil
}
// InitFromConfig is a compatibility function to construct the parser the old way
func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.DefaultTags = config.DefaultTags
return p.Init()
}
func init() {
parsers.Add("influx_upstream",
func(_ string) telegraf.Parser {
return &Parser{}
},
)
}
// StreamParser is an InfluxDB Line Protocol parser. It is not safe for
// concurrent use in multiple goroutines.
type StreamParser struct {

View File

@ -594,7 +594,8 @@ func parseTests(stream bool) []parseTest {
func TestParser(t *testing.T) {
for _, tt := range parseTests(false) {
t.Run(tt.name, func(t *testing.T) {
parser := NewParser()
parser := Parser{}
require.NoError(t, parser.Init())
parser.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
@ -621,7 +622,8 @@ func TestParser(t *testing.T) {
func BenchmarkParser(b *testing.B) {
for _, tt := range parseTests(false) {
b.Run(tt.name, func(b *testing.B) {
parser := NewParser()
parser := Parser{}
require.NoError(b, parser.Init())
for n := 0; n < b.N; n++ {
metrics, err := parser.Parse(tt.input)
_ = err
@ -728,7 +730,10 @@ func TestSeriesParser(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := NewSeriesParser()
parser := Parser{
Type: "series",
}
require.NoError(t, parser.Init())
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
@ -778,7 +783,8 @@ func TestParserErrorString(t *testing.T) {
for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) {
parser := NewParser()
parser := Parser{}
require.NoError(t, parser.Init())
_, err := parser.Parse(tt.input)
require.Equal(t, tt.errString, err.Error())

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
)
const (
@ -59,33 +60,23 @@ func (e *ParseError) Error() string {
// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct {
DefaultTags map[string]string
DefaultTags map[string]string `toml:"-"`
// If set to "series" a series machine will be initialized, defaults to regular machine
Type string `toml:"-"`
sync.Mutex
*machine
handler *MetricHandler
}
// NewParser returns a Parser than accepts line protocol
func NewParser(handler *MetricHandler) *Parser {
return &Parser{
machine: NewMachine(handler),
handler: handler,
}
}
// NewSeriesParser returns a Parser than accepts a measurement and tagset
func NewSeriesParser(handler *MetricHandler) *Parser {
return &Parser{
machine: NewSeriesMachine(handler),
handler: handler,
}
}
func (p *Parser) SetTimeFunc(f TimeFunc) {
p.handler.SetTimeFunc(f)
}
func (p *Parser) SetTimePrecision(u time.Duration) {
p.handler.SetTimePrecision(u)
}
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
p.Lock()
defer p.Unlock()
@ -160,6 +151,32 @@ func (p *Parser) applyDefaultTagsSingle(metric telegraf.Metric) {
}
}
func (p *Parser) Init() error {
p.handler = NewMetricHandler()
if p.Type == "series" {
p.machine = NewSeriesMachine(p.handler)
} else {
p.machine = NewMachine(p.handler)
}
return nil
}
// InitFromConfig is a compatibility function to construct the parser the old way
func (p *Parser) InitFromConfig(config *parsers.Config) error {
p.DefaultTags = config.DefaultTags
return p.Init()
}
func init() {
parsers.Add("influx",
func(_ string) telegraf.Parser {
return &Parser{}
},
)
}
// StreamParser is an InfluxDB Line Protocol parser. It is not safe for
// concurrent use in multiple goroutines.
type StreamParser struct {

View File

@ -568,8 +568,8 @@ var ptests = []struct {
func TestParser(t *testing.T) {
for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) {
handler := NewMetricHandler()
parser := NewParser(handler)
parser := Parser{}
require.NoError(t, parser.Init())
parser.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
@ -592,8 +592,8 @@ func TestParser(t *testing.T) {
func BenchmarkParser(b *testing.B) {
for _, tt := range ptests {
b.Run(tt.name, func(b *testing.B) {
handler := NewMetricHandler()
parser := NewParser(handler)
parser := Parser{}
require.NoError(b, parser.Init())
for n := 0; n < b.N; n++ {
metrics, err := parser.Parse(tt.input)
_ = err
@ -698,8 +698,10 @@ func TestSeriesParser(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler := NewMetricHandler()
parser := NewSeriesParser(handler)
parser := Parser{
Type: "series",
}
require.NoError(t, parser.Init())
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
@ -749,8 +751,8 @@ func TestParserErrorString(t *testing.T) {
for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) {
handler := NewMetricHandler()
parser := NewParser(handler)
parser := Parser{}
require.NoError(t, parser.Init())
_, err := parser.Parse(tt.input)
require.Equal(t, tt.errString, err.Error())

View File

@ -68,7 +68,7 @@ func TestMultipleConfigs(t *testing.T) {
}
// Process expected metrics and compare with resulting metrics
expectedOutputs, err := readMetricFile(fmt.Sprintf("testdata/%s/expected.out", f.Name()))
expectedOutputs, err := readMetricFile(t, fmt.Sprintf("testdata/%s/expected.out", f.Name()))
require.NoError(t, err)
resultingMetrics := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expectedOutputs, resultingMetrics, testutil.IgnoreTime())
@ -86,7 +86,7 @@ func TestMultipleConfigs(t *testing.T) {
}
}
func readMetricFile(path string) ([]telegraf.Metric, error) {
func readMetricFile(t *testing.T, path string) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
expectedFile, err := os.Open(path)
if err != nil {
@ -94,7 +94,8 @@ func readMetricFile(path string) ([]telegraf.Metric, error) {
}
defer expectedFile.Close()
parser := influx.NewParser(influx.NewMetricHandler())
parser := &influx.Parser{}
require.NoError(t, parser.Init())
scanner := bufio.NewScanner(expectedFile)
for scanner.Scan() {
line := scanner.Text()

View File

@ -4,8 +4,6 @@ import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
"github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2"
"github.com/influxdata/telegraf/plugins/parsers/temporary/xpath"
@ -195,12 +193,6 @@ func NewParser(config *Config) (Parser, error) {
var err error
var parser Parser
switch config.DataFormat {
case "influx":
if config.InfluxParserType == "upstream" {
parser, err = NewInfluxUpstreamParser()
} else {
parser, err = NewInfluxParser()
}
case "prometheusremotewrite":
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
default:
@ -221,15 +213,6 @@ func NewParser(config *Config) (Parser, error) {
return parser, err
}
func NewInfluxParser() (Parser, error) {
handler := influx.NewMetricHandler()
return influx.NewParser(handler), nil
}
func NewInfluxUpstreamParser() (Parser, error) {
return influx_upstream.NewParser(), nil
}
func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) {
return &prometheusremotewrite.Parser{
DefaultTags: defaultTags,

View File

@ -1302,7 +1302,8 @@ func TestTestCases(t *testing.T) {
},
}
parser := influx.NewParser(influx.NewMetricHandler())
parser := &influx.Parser{}
require.NoError(t, parser.Init())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -17,7 +17,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common "github.com/influxdata/telegraf/plugins/common/starlark"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -3258,10 +3258,11 @@ func TestAllScriptTestData(t *testing.T) {
}
}
var parser, _ = parsers.NewInfluxParser() // literally never returns errors.
// parses metric lines out of line protocol following a header, with a trailing blank line
func parseMetricsFrom(t *testing.T, lines []string, header string) (metrics []telegraf.Metric) {
parser := &influx.Parser{}
require.NoError(t, parser.Init())
require.NotZero(t, len(lines), "Expected some lines to parse from .star file, found none")
startIdx := -1
endIdx := len(lines)

View File

@ -60,7 +60,8 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
filename: "testcases/semicolon.conf",
},
}
parser := influx.NewParser(influx.NewMetricHandler())
parser := &influx.Parser{}
require.NoError(t, parser.Init())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -125,7 +126,8 @@ func TestSerializeTransformationBatch(t *testing.T) {
filename: "testcases/semicolon.conf",
},
}
parser := influx.NewParser(influx.NewMetricHandler())
parser := &influx.Parser{}
require.NoError(t, parser.Init())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {