feat(parsers.influx): Allow a user to set the timestamp precision (#13419)

This commit is contained in:
Joshua Powers 2023-07-13 11:09:12 -06:00 committed by GitHub
parent cd1932f031
commit 14f52eae01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 255 additions and 6 deletions

View File

@ -271,7 +271,10 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
parser.SetTimePrecision(precision)
if err = parser.SetTimePrecision(precision); err != nil {
h.Log.Debugf("Error setting precision of parser: %v", err)
return
}
}
metrics, err = parser.Parse(bytes)

View File

@ -19,5 +19,11 @@ Parses metrics using the [Influx Line Protocol][].
## Influx line protocol parser
## 'internal' is the default. 'upstream' is a newer parser that is faster
## and more memory efficient.
## influx_parser_type = "internal"
# influx_parser_type = "internal"
## Influx line protocol timestamp precision
## Time duration to specify the precision of the data's timestamp to parse.
## The default assumes nanosecond (1ns) precision, but users can set to
## second (1s), millisecond (1ms), or microsecond (1us) precision as well.
# influx_timestamp_precision = "1ns"
```

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
@ -103,6 +104,7 @@ func convertToParseError(input []byte, rawErr error) error {
// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct {
InfluxTimestampPrecsion config.Duration `toml:"influx_timestamp_precision"`
DefaultTags map[string]string `toml:"-"`
// If set to "series" a series machine will be initialized, defaults to regular machine
Type string `toml:"-"`
@ -149,8 +151,10 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) SetTimePrecision(u time.Duration) {
func (p *Parser) SetTimePrecision(u time.Duration) error {
switch u {
case 0:
p.precision = lineprotocol.Nanosecond
case time.Nanosecond:
p.precision = lineprotocol.Nanosecond
case time.Microsecond:
@ -159,7 +163,11 @@ func (p *Parser) SetTimePrecision(u time.Duration) {
p.precision = lineprotocol.Millisecond
case time.Second:
p.precision = lineprotocol.Second
default:
return fmt.Errorf("invalid time precision: %d", u)
}
return nil
}
func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
@ -181,8 +189,11 @@ func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) {
}
func (p *Parser) Init() error {
if err := p.SetTimePrecision(time.Duration(p.InfluxTimestampPrecsion)); err != nil {
return err
}
p.defaultTime = time.Now
p.precision = lineprotocol.Nanosecond
p.allowPartial = p.Type == "series"
return nil

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
@ -765,6 +766,114 @@ func TestSeriesParser(t *testing.T) {
}
}
func TestParserTimestampPrecision(t *testing.T) {
var tests = []struct {
name string
precision string
input []byte
metrics []telegraf.Metric
err error
}{
{
name: "default - nanosecond",
precision: "",
input: []byte("cpu value=1 1234567890123123123"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(1),
},
time.Unix(0, 1234567890123123123),
),
},
},
{
name: "nanosecond",
precision: "1ns",
input: []byte("cpu value=2 1234567890123123999"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(2),
},
time.Unix(0, 1234567890123123999),
),
},
},
{
name: "microsecond",
precision: "1us",
input: []byte("cpu value=3 1234567890123123"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(3),
},
time.Unix(0, 1234567890123123000),
),
},
},
{
name: "millisecond",
precision: "1ms",
input: []byte("cpu value=4 1234567890123"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(4),
},
time.Unix(0, 1234567890123000000),
),
},
},
{
name: "second",
precision: "1s",
input: []byte("cpu value=5 1234567890"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(5),
},
time.Unix(0, 1234567890000000000),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := config.Duration(0)
require.NoError(t, d.UnmarshalText([]byte(tt.precision)))
parser := Parser{InfluxTimestampPrecsion: d}
require.NoError(t, parser.Init())
metrics, err := parser.Parse(tt.input)
require.NoError(t, err)
require.Equal(t, tt.metrics, metrics)
})
}
}
func TestParserInvalidTimestampPrecision(t *testing.T) {
d := config.Duration(0)
for _, precision := range []string{"1h", "1d", "2s", "1m", "2ns"} {
require.NoError(t, d.UnmarshalText([]byte(precision)))
parser := Parser{InfluxTimestampPrecsion: d}
require.ErrorContains(t, parser.Init(), "invalid time precision")
}
}
func TestParserErrorString(t *testing.T) {
var ptests = []struct {
name string

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/parsers"
)
@ -60,6 +61,7 @@ func (e *ParseError) Error() string {
// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct {
InfluxTimestampPrecsion config.Duration `toml:"influx_timestamp_precision"`
DefaultTags map[string]string `toml:"-"`
// If set to "series" a series machine will be initialized, defaults to regular machine
Type string `toml:"-"`
@ -155,6 +157,15 @@ func (p *Parser) Init() error {
p.machine = NewMachine(p.handler)
}
timeDuration := time.Duration(p.InfluxTimestampPrecsion)
switch timeDuration {
case 0:
case time.Nanosecond, time.Microsecond, time.Millisecond, time.Second:
p.SetTimePrecision(timeDuration)
default:
return fmt.Errorf("invalid time precision: %d", p.InfluxTimestampPrecsion)
}
return nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
@ -601,6 +602,114 @@ func TestParser(t *testing.T) {
}
}
func TestParserTimestampPrecision(t *testing.T) {
var tests = []struct {
name string
precision string
input []byte
metrics []telegraf.Metric
err error
}{
{
name: "default - nanosecond",
precision: "",
input: []byte("cpu value=1 1234567890123123123"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(1),
},
time.Unix(0, 1234567890123123123),
),
},
},
{
name: "nanosecond",
precision: "1ns",
input: []byte("cpu value=2 1234567890123123999"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(2),
},
time.Unix(0, 1234567890123123999),
),
},
},
{
name: "microsecond",
precision: "1us",
input: []byte("cpu value=3 1234567890123123"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(3),
},
time.Unix(0, 1234567890123123000),
),
},
},
{
name: "millisecond",
precision: "1ms",
input: []byte("cpu value=4 1234567890123"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(4),
},
time.Unix(0, 1234567890123000000),
),
},
},
{
name: "second",
precision: "1s",
input: []byte("cpu value=5 1234567890"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]any{
"value": float64(5),
},
time.Unix(0, 1234567890000000000),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := config.Duration(0)
require.NoError(t, d.UnmarshalText([]byte(tt.precision)))
parser := Parser{InfluxTimestampPrecsion: d}
require.NoError(t, parser.Init())
metrics, err := parser.Parse(tt.input)
require.NoError(t, err)
require.Equal(t, tt.metrics, metrics)
})
}
}
func TestParserInvalidTimestampPrecision(t *testing.T) {
d := config.Duration(0)
for _, precision := range []string{"1h", "1d", "2s", "1m", "2ns"} {
require.NoError(t, d.UnmarshalText([]byte(precision)))
parser := Parser{InfluxTimestampPrecsion: d}
require.ErrorContains(t, parser.Init(), "invalid time precision")
}
}
func BenchmarkParser(b *testing.B) {
for _, tt := range ptests {
b.Run(tt.name, func(b *testing.B) {