diff --git a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go index 72bbbfb06..6a66f76ad 100644 --- a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go +++ b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go @@ -271,7 +271,10 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc { if precisionStr != "" { precision := getPrecisionMultiplier(precisionStr) - parser.SetTimePrecision(precision) + if err = parser.SetTimePrecision(precision); err != nil { + h.Log.Debugf("Error setting precision of parser: %v", err) + return + } } metrics, err = parser.Parse(bytes) diff --git a/plugins/parsers/influx/README.md b/plugins/parsers/influx/README.md index f5862a4b4..8828bd794 100644 --- a/plugins/parsers/influx/README.md +++ b/plugins/parsers/influx/README.md @@ -19,5 +19,11 @@ Parses metrics using the [Influx Line Protocol][]. ## Influx line protocol parser ## 'internal' is the default. 'upstream' is a newer parser that is faster ## and more memory efficient. - ## influx_parser_type = "internal" + # influx_parser_type = "internal" + + ## Influx line protocol timestamp precision + ## Time duration to specify the precision of the data's timestamp to parse. + ## The default assumes nanosecond (1ns) precision, but users can set to + ## second (1s), millisecond (1ms), or microsecond (1us) precision as well. + # influx_timestamp_precision = "1ns" ``` diff --git a/plugins/parsers/influx/influx_upstream/parser.go b/plugins/parsers/influx/influx_upstream/parser.go index 0e0201aa3..095289f95 100644 --- a/plugins/parsers/influx/influx_upstream/parser.go +++ b/plugins/parsers/influx/influx_upstream/parser.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -103,7 +104,8 @@ func convertToParseError(input []byte, rawErr error) error { // Parser is an InfluxDB Line Protocol parser that implements the // parsers.Parser interface. type Parser struct { - DefaultTags map[string]string `toml:"-"` + InfluxTimestampPrecsion config.Duration `toml:"influx_timestamp_precision"` + DefaultTags map[string]string `toml:"-"` // If set to "series" a series machine will be initialized, defaults to regular machine Type string `toml:"-"` @@ -149,8 +151,10 @@ func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } -func (p *Parser) SetTimePrecision(u time.Duration) { +func (p *Parser) SetTimePrecision(u time.Duration) error { switch u { + case 0: + p.precision = lineprotocol.Nanosecond case time.Nanosecond: p.precision = lineprotocol.Nanosecond case time.Microsecond: @@ -159,7 +163,11 @@ func (p *Parser) SetTimePrecision(u time.Duration) { p.precision = lineprotocol.Millisecond case time.Second: p.precision = lineprotocol.Second + default: + return fmt.Errorf("invalid time precision: %d", u) } + + return nil } func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) { @@ -181,8 +189,11 @@ func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) { } func (p *Parser) Init() error { + if err := p.SetTimePrecision(time.Duration(p.InfluxTimestampPrecsion)); err != nil { + return err + } + p.defaultTime = time.Now - p.precision = lineprotocol.Nanosecond p.allowPartial = p.Type == "series" return nil diff --git a/plugins/parsers/influx/influx_upstream/parser_test.go b/plugins/parsers/influx/influx_upstream/parser_test.go index f6e08ea73..9af4ed900 100644 --- a/plugins/parsers/influx/influx_upstream/parser_test.go +++ b/plugins/parsers/influx/influx_upstream/parser_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -765,6 +766,114 @@ func TestSeriesParser(t *testing.T) { } } +func TestParserTimestampPrecision(t *testing.T) { + var tests = []struct { + name string + precision string + input []byte + metrics []telegraf.Metric + err error + }{ + { + name: "default - nanosecond", + precision: "", + input: []byte("cpu value=1 1234567890123123123"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(1), + }, + time.Unix(0, 1234567890123123123), + ), + }, + }, + { + name: "nanosecond", + precision: "1ns", + input: []byte("cpu value=2 1234567890123123999"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(2), + }, + time.Unix(0, 1234567890123123999), + ), + }, + }, + { + name: "microsecond", + precision: "1us", + input: []byte("cpu value=3 1234567890123123"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(3), + }, + time.Unix(0, 1234567890123123000), + ), + }, + }, + { + name: "millisecond", + precision: "1ms", + input: []byte("cpu value=4 1234567890123"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(4), + }, + time.Unix(0, 1234567890123000000), + ), + }, + }, + { + name: "second", + precision: "1s", + input: []byte("cpu value=5 1234567890"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(5), + }, + time.Unix(0, 1234567890000000000), + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := config.Duration(0) + require.NoError(t, d.UnmarshalText([]byte(tt.precision))) + parser := Parser{InfluxTimestampPrecsion: d} + require.NoError(t, parser.Init()) + + metrics, err := parser.Parse(tt.input) + require.NoError(t, err) + + require.Equal(t, tt.metrics, metrics) + }) + } +} + +func TestParserInvalidTimestampPrecision(t *testing.T) { + d := config.Duration(0) + for _, precision := range []string{"1h", "1d", "2s", "1m", "2ns"} { + require.NoError(t, d.UnmarshalText([]byte(precision))) + parser := Parser{InfluxTimestampPrecsion: d} + require.ErrorContains(t, parser.Init(), "invalid time precision") + } +} + func TestParserErrorString(t *testing.T) { var ptests = []struct { name string diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index 2bec40f49..8aa4432ee 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -60,7 +61,8 @@ func (e *ParseError) Error() string { // Parser is an InfluxDB Line Protocol parser that implements the // parsers.Parser interface. type Parser struct { - DefaultTags map[string]string `toml:"-"` + InfluxTimestampPrecsion config.Duration `toml:"influx_timestamp_precision"` + DefaultTags map[string]string `toml:"-"` // If set to "series" a series machine will be initialized, defaults to regular machine Type string `toml:"-"` @@ -155,6 +157,15 @@ func (p *Parser) Init() error { p.machine = NewMachine(p.handler) } + timeDuration := time.Duration(p.InfluxTimestampPrecsion) + switch timeDuration { + case 0: + case time.Nanosecond, time.Microsecond, time.Millisecond, time.Second: + p.SetTimePrecision(timeDuration) + default: + return fmt.Errorf("invalid time precision: %d", p.InfluxTimestampPrecsion) + } + return nil } diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go index a3d62e24a..a76fe6f64 100644 --- a/plugins/parsers/influx/parser_test.go +++ b/plugins/parsers/influx/parser_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -601,6 +602,114 @@ func TestParser(t *testing.T) { } } +func TestParserTimestampPrecision(t *testing.T) { + var tests = []struct { + name string + precision string + input []byte + metrics []telegraf.Metric + err error + }{ + { + name: "default - nanosecond", + precision: "", + input: []byte("cpu value=1 1234567890123123123"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(1), + }, + time.Unix(0, 1234567890123123123), + ), + }, + }, + { + name: "nanosecond", + precision: "1ns", + input: []byte("cpu value=2 1234567890123123999"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(2), + }, + time.Unix(0, 1234567890123123999), + ), + }, + }, + { + name: "microsecond", + precision: "1us", + input: []byte("cpu value=3 1234567890123123"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(3), + }, + time.Unix(0, 1234567890123123000), + ), + }, + }, + { + name: "millisecond", + precision: "1ms", + input: []byte("cpu value=4 1234567890123"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(4), + }, + time.Unix(0, 1234567890123000000), + ), + }, + }, + { + name: "second", + precision: "1s", + input: []byte("cpu value=5 1234567890"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]any{ + "value": float64(5), + }, + time.Unix(0, 1234567890000000000), + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := config.Duration(0) + require.NoError(t, d.UnmarshalText([]byte(tt.precision))) + parser := Parser{InfluxTimestampPrecsion: d} + require.NoError(t, parser.Init()) + + metrics, err := parser.Parse(tt.input) + require.NoError(t, err) + + require.Equal(t, tt.metrics, metrics) + }) + } +} + +func TestParserInvalidTimestampPrecision(t *testing.T) { + d := config.Duration(0) + for _, precision := range []string{"1h", "1d", "2s", "1m", "2ns"} { + require.NoError(t, d.UnmarshalText([]byte(precision))) + parser := Parser{InfluxTimestampPrecsion: d} + require.ErrorContains(t, parser.Init(), "invalid time precision") + } +} + func BenchmarkParser(b *testing.B) { for _, tt := range ptests { b.Run(tt.name, func(b *testing.B) {