feat(processors.converter): Convert tag or field as metric timestamp (#12767)
This commit is contained in:
parent
97fd189d11
commit
736f6ac5bb
|
|
@ -14,6 +14,10 @@ string value to a numeric type, precision may be lost if the number is too
|
|||
large. The largest numeric type this plugin supports is `float64`, and if a
|
||||
string 'number' exceeds its size limit, accuracy may be lost.
|
||||
|
||||
**Note on multiple measurement or timestamps:** Users can provide multiple
|
||||
tags or fields to use as the measurement name or timestamp. However, note that
|
||||
the order in the array is not guaranteed!
|
||||
|
||||
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||
|
||||
In addition to the plugin-specific configuration settings, plugins support
|
||||
|
|
@ -41,6 +45,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
boolean = []
|
||||
float = []
|
||||
|
||||
## Optional tag to use as metric timestamp
|
||||
# timestamp = []
|
||||
|
||||
## Format of the timestamp determined by the tag above. This can be any of
|
||||
## "unix", "unix_ms", "unix_us", "unix_ns", or a valid Golang time format.
|
||||
## It is required, when using the timestamp option.
|
||||
# timestamp_format = ""
|
||||
|
||||
## Fields to convert
|
||||
##
|
||||
## The table key determines the target type, and the array of key-values
|
||||
|
|
@ -54,6 +66,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
unsigned = []
|
||||
boolean = []
|
||||
float = []
|
||||
|
||||
## Optional field to use as metric timestamp
|
||||
# timestamp = []
|
||||
|
||||
## Format of the timestamp determined by the field above. This can be any
|
||||
## of "unix", "unix_ms", "unix_us", "unix_ns", or a valid Golang time
|
||||
## format. It is required, when using the timestamp option.
|
||||
# timestamp_format = ""
|
||||
```
|
||||
|
||||
### Example
|
||||
|
|
@ -96,3 +116,19 @@ Rename the measurement from a tag value:
|
|||
- mqtt_consumer,topic=sensor temp=42
|
||||
+ sensor temp=42
|
||||
```
|
||||
|
||||
Set the metric timestamp from a tag:
|
||||
|
||||
```toml
|
||||
[[processors.converter]]
|
||||
[processors.converter.tags]
|
||||
timestamp = ["time"]
|
||||
timestamp_format = "unix
|
||||
```
|
||||
|
||||
```diff
|
||||
- metric,time="1677610769" temp=42
|
||||
+ metric temp=42 1677610769
|
||||
```
|
||||
|
||||
This is also possible via the fields converter.
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
)
|
||||
|
||||
|
|
@ -19,13 +20,15 @@ import (
|
|||
var sampleConfig string
|
||||
|
||||
type Conversion struct {
|
||||
Measurement []string `toml:"measurement"`
|
||||
Tag []string `toml:"tag"`
|
||||
String []string `toml:"string"`
|
||||
Integer []string `toml:"integer"`
|
||||
Unsigned []string `toml:"unsigned"`
|
||||
Boolean []string `toml:"boolean"`
|
||||
Float []string `toml:"float"`
|
||||
Measurement []string `toml:"measurement"`
|
||||
Tag []string `toml:"tag"`
|
||||
String []string `toml:"string"`
|
||||
Integer []string `toml:"integer"`
|
||||
Unsigned []string `toml:"unsigned"`
|
||||
Boolean []string `toml:"boolean"`
|
||||
Float []string `toml:"float"`
|
||||
Timestamp []string `toml:"timestamp"`
|
||||
TimestampFormat string `toml:"timestamp_format"`
|
||||
}
|
||||
|
||||
type Converter struct {
|
||||
|
|
@ -45,6 +48,7 @@ type ConversionFilter struct {
|
|||
Unsigned filter.Filter
|
||||
Boolean filter.Filter
|
||||
Float filter.Filter
|
||||
Timestamp filter.Filter
|
||||
}
|
||||
|
||||
func (*Converter) SampleConfig() string {
|
||||
|
|
@ -125,6 +129,11 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
cf.Timestamp, err = filter.Compile(conv.Timestamp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cf, nil
|
||||
}
|
||||
|
||||
|
|
@ -197,6 +206,18 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
|
|||
metric.AddField(key, v)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.tagConversions.Timestamp != nil && p.tagConversions.Timestamp.Match(key) {
|
||||
time, err := internal.ParseTimestamp(p.Tags.TimestampFormat, value, "")
|
||||
if err != nil {
|
||||
p.Log.Errorf("error converting to timestamp [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
metric.RemoveTag(key)
|
||||
metric.SetTime(time)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -297,6 +318,18 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
|
|||
metric.AddField(key, v)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.fieldConversions.Timestamp != nil && p.fieldConversions.Timestamp.Match(key) {
|
||||
time, err := internal.ParseTimestamp(p.Fields.TimestampFormat, value, "")
|
||||
if err != nil {
|
||||
p.Log.Errorf("error converting to timestamp [%T]: %v", value, value)
|
||||
continue
|
||||
}
|
||||
|
||||
metric.RemoveField(key)
|
||||
metric.SetTime(time)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -464,6 +464,207 @@ func TestConverter(t *testing.T) {
|
|||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from unix timestamp field",
|
||||
converter: &Converter{
|
||||
Fields: &Conversion{
|
||||
Timestamp: []string{"time"},
|
||||
TimestampFormat: "unix",
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": 1111111111,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
},
|
||||
time.Unix(1111111111, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from unix timestamp tag",
|
||||
converter: &Converter{
|
||||
Tags: &Conversion{
|
||||
Timestamp: []string{"time"},
|
||||
TimestampFormat: "unix",
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"time": "1677610769",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"a": 41.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 41.0,
|
||||
},
|
||||
time.Unix(1677610769, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from invalid timestamp tag",
|
||||
converter: &Converter{
|
||||
Tags: &Conversion{
|
||||
Timestamp: []string{"time"},
|
||||
TimestampFormat: "blah",
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"time": "1677610769",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"a": 41.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"time": "1677610769",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"a": 41.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from rfc3339 timestamp field",
|
||||
converter: &Converter{
|
||||
Fields: &Conversion{
|
||||
Timestamp: []string{"time"},
|
||||
TimestampFormat: "rfc3339",
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": "2009-02-13T23:31:30Z",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
},
|
||||
time.Unix(1234567890, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "from custom timestamp field",
|
||||
converter: &Converter{
|
||||
Fields: &Conversion{
|
||||
Timestamp: []string{"time"},
|
||||
TimestampFormat: "2006-01-02 15:04:05 MST",
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": "2016-03-01 02:39:59 MST",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
},
|
||||
time.Unix(1456799999, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid timestamp format",
|
||||
converter: &Converter{
|
||||
Fields: &Conversion{
|
||||
Timestamp: []string{"time"},
|
||||
TimestampFormat: "2006-01-0",
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": "2022-07-04 01:30:59 MST",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": "2022-07-04 01:30:59 MST",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no timestamp format",
|
||||
converter: &Converter{
|
||||
Fields: &Conversion{
|
||||
Timestamp: []string{"time"},
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": "2022-07-04 01:30:59 MST",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": "2022-07-04 01:30:59 MST",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
|
@ -478,6 +679,33 @@ func TestConverter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMultipleTimestamps(t *testing.T) {
|
||||
c := &Converter{
|
||||
Fields: &Conversion{
|
||||
Timestamp: []string{"time", "date"},
|
||||
TimestampFormat: "2006-01-02 15:04:05 MST",
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, c.Init())
|
||||
|
||||
input := testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": 42.0,
|
||||
"time": "1990-01-01 12:45:13 EST",
|
||||
"date": "2016-03-01 02:39:59 MST",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
)
|
||||
|
||||
result := c.Apply(input)
|
||||
require.Len(t, result, 1)
|
||||
require.Len(t, result[0].TagList(), 0)
|
||||
require.Len(t, result[0].FieldList(), 1)
|
||||
}
|
||||
|
||||
func TestMeasurement(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
|
|||
|
|
@ -13,6 +13,14 @@
|
|||
boolean = []
|
||||
float = []
|
||||
|
||||
## Optional tag to use as metric timestamp
|
||||
# timestamp = []
|
||||
|
||||
## Format of the timestamp determined by the tag above. This can be any of
|
||||
## "unix", "unix_ms", "unix_us", "unix_ns", or a valid Golang time format.
|
||||
## It is required, when using the timestamp option.
|
||||
# timestamp_format = ""
|
||||
|
||||
## Fields to convert
|
||||
##
|
||||
## The table key determines the target type, and the array of key-values
|
||||
|
|
@ -26,3 +34,11 @@
|
|||
unsigned = []
|
||||
boolean = []
|
||||
float = []
|
||||
|
||||
## Optional field to use as metric timestamp
|
||||
# timestamp = []
|
||||
|
||||
## Format of the timestamp determined by the field above. This can be any
|
||||
## of "unix", "unix_ms", "unix_us", "unix_ns", or a valid Golang time
|
||||
## format. It is required, when using the timestamp option.
|
||||
# timestamp_format = ""
|
||||
|
|
|
|||
Loading…
Reference in New Issue