feat(parsers.csv): add option for overwrite tags (#12008)
This commit is contained in:
parent
d10ab3a417
commit
712092bd92
|
|
@ -73,6 +73,9 @@ values.
|
||||||
## will be added as fields.
|
## will be added as fields.
|
||||||
csv_tag_columns = []
|
csv_tag_columns = []
|
||||||
|
|
||||||
|
## Set to true to let the column tags overwrite the metadata and default tags.
|
||||||
|
csv_tag_overwrite = false
|
||||||
|
|
||||||
## The column to extract the name of the metric from. Will not be
|
## The column to extract the name of the metric from. Will not be
|
||||||
## included as field in metric.
|
## included as field in metric.
|
||||||
csv_measurement_column = ""
|
csv_measurement_column = ""
|
||||||
|
|
@ -167,7 +170,7 @@ Config:
|
||||||
csv_metadata_separators = [":", "="]
|
csv_metadata_separators = [":", "="]
|
||||||
csv_metadata_trim_set = " #"
|
csv_metadata_trim_set = " #"
|
||||||
csv_header_row_count = 1
|
csv_header_row_count = 1
|
||||||
csv_tag_columns = ["Version","File Created"]
|
csv_tag_columns = ["Version","cpu"]
|
||||||
csv_timestamp_column = "time"
|
csv_timestamp_column = "time"
|
||||||
csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
|
csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
|
||||||
```
|
```
|
||||||
|
|
@ -177,14 +180,46 @@ Input:
|
||||||
```csv
|
```csv
|
||||||
# Version=1.1
|
# Version=1.1
|
||||||
# File Created: 2021-11-17T07:02:45+10:00
|
# File Created: 2021-11-17T07:02:45+10:00
|
||||||
measurement,cpu,time_user,time_system,time_idle,time
|
Version,measurement,cpu,time_user,time_system,time_idle,time
|
||||||
cpu,cpu0,42,42,42,2018-09-13T13:03:28Z
|
1.2,cpu,cpu0,42,42,42,2018-09-13T13:03:28Z
|
||||||
```
|
```
|
||||||
|
|
||||||
Output:
|
Output:
|
||||||
|
|
||||||
```text
|
```text
|
||||||
cpu,File\ Created=2021-11-17T07:02:45+10:00,Version=1.1 cpu=cpu0,time_user=42,time_system=42,time_idle=42 1536869008000000000
|
cpu,cpu=cpu0,File\ Created=2021-11-17T07:02:45+10:00,Version=1.1 time_user=42,time_system=42,time_idle=42 1536869008000000000
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Config:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.file]]
|
||||||
|
files = ["example"]
|
||||||
|
data_format = "csv"
|
||||||
|
csv_metadata_rows = 2
|
||||||
|
csv_metadata_separators = [":", "="]
|
||||||
|
csv_metadata_trim_set = " #"
|
||||||
|
csv_header_row_count = 1
|
||||||
|
csv_tag_columns = ["Version","cpu"]
|
||||||
|
csv_tag_overwrite = true
|
||||||
|
csv_timestamp_column = "time"
|
||||||
|
csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
|
||||||
|
```
|
||||||
|
|
||||||
|
Input:
|
||||||
|
|
||||||
|
```csv
|
||||||
|
# Version=1.1
|
||||||
|
# File Created: 2021-11-17T07:02:45+10:00
|
||||||
|
Version,measurement,cpu,time_user,time_system,time_idle,time
|
||||||
|
1.2,cpu,cpu0,42,42,42,2018-09-13T13:03:28Z
|
||||||
|
```
|
||||||
|
|
||||||
|
Output:
|
||||||
|
|
||||||
|
```text
|
||||||
|
cpu,cpu=cpu0,File\ Created=2021-11-17T07:02:45+10:00,Version=1.2 time_user=42,time_system=42,time_idle=42 1536869008000000000
|
||||||
|
```
|
||||||
|
|
||||||
|
[time parse]: https://pkg.go.dev/time#Parse
|
||||||
[metric filtering]: /docs/CONFIGURATION.md#metric-filtering
|
[metric filtering]: /docs/CONFIGURATION.md#metric-filtering
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ type Parser struct {
|
||||||
SkipColumns int `toml:"csv_skip_columns"`
|
SkipColumns int `toml:"csv_skip_columns"`
|
||||||
SkipRows int `toml:"csv_skip_rows"`
|
SkipRows int `toml:"csv_skip_rows"`
|
||||||
TagColumns []string `toml:"csv_tag_columns"`
|
TagColumns []string `toml:"csv_tag_columns"`
|
||||||
|
TagOverwrite bool `toml:"csv_tag_overwrite"`
|
||||||
TimestampColumn string `toml:"csv_timestamp_column"`
|
TimestampColumn string `toml:"csv_timestamp_column"`
|
||||||
TimestampFormat string `toml:"csv_timestamp_format"`
|
TimestampFormat string `toml:"csv_timestamp_format"`
|
||||||
Timezone string `toml:"csv_timezone"`
|
Timezone string `toml:"csv_timezone"`
|
||||||
|
|
@ -314,6 +315,18 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) {
|
||||||
recordFields := make(map[string]interface{})
|
recordFields := make(map[string]interface{})
|
||||||
tags := make(map[string]string)
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
if p.TagOverwrite {
|
||||||
|
// add default tags
|
||||||
|
for k, v := range p.DefaultTags {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// add metadata tags
|
||||||
|
for k, v := range p.metadataTags {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// skip columns in record
|
// skip columns in record
|
||||||
record = record[p.SkipColumns:]
|
record = record[p.SkipColumns:]
|
||||||
outer:
|
outer:
|
||||||
|
|
@ -391,14 +404,16 @@ outer:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add metadata tags
|
if !p.TagOverwrite {
|
||||||
for k, v := range p.metadataTags {
|
// add metadata tags
|
||||||
tags[k] = v
|
for k, v := range p.metadataTags {
|
||||||
}
|
tags[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
// add default tags
|
// add default tags
|
||||||
for k, v := range p.DefaultTags {
|
for k, v := range p.DefaultTags {
|
||||||
tags[k] = v
|
tags[k] = v
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// will default to plugin name
|
// will default to plugin name
|
||||||
|
|
@ -471,6 +486,7 @@ func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||||
p.ColumnNames = config.CSVColumnNames
|
p.ColumnNames = config.CSVColumnNames
|
||||||
p.ColumnTypes = config.CSVColumnTypes
|
p.ColumnTypes = config.CSVColumnTypes
|
||||||
p.TagColumns = config.CSVTagColumns
|
p.TagColumns = config.CSVTagColumns
|
||||||
|
p.TagOverwrite = config.CSVTagOverwrite
|
||||||
p.MeasurementColumn = config.CSVMeasurementColumn
|
p.MeasurementColumn = config.CSVMeasurementColumn
|
||||||
p.TimestampColumn = config.CSVTimestampColumn
|
p.TimestampColumn = config.CSVTimestampColumn
|
||||||
p.TimestampFormat = config.CSVTimestampFormat
|
p.TimestampFormat = config.CSVTimestampFormat
|
||||||
|
|
|
||||||
|
|
@ -1015,32 +1015,47 @@ timestamp,type,name,status
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) {
|
func TestOverwriteDefaultTagsAndMetaDataTags(t *testing.T) {
|
||||||
// This tests makes sure that the default tags and metadata tags don't overwrite record data
|
csv := []byte(`second=orange
|
||||||
// This test also covers the scenario where the metadata overwrites the default tag
|
fourth=plain
|
||||||
p := &Parser{
|
1.4,apple,hi
|
||||||
ColumnNames: []string{"first", "second", "third"},
|
`)
|
||||||
TagColumns: []string{"second", "third"},
|
defaultTags := map[string]string{"third": "bye", "fourth": "car"}
|
||||||
TimeFunc: DefaultTime,
|
|
||||||
MetadataRows: 2,
|
tests := []struct {
|
||||||
MetadataSeparators: []string{"="},
|
name string
|
||||||
|
tagOverwrite bool
|
||||||
|
expectedTags map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Don't overwrite tags",
|
||||||
|
tagOverwrite: false,
|
||||||
|
expectedTags: map[string]string{"second": "orange", "third": "bye", "fourth": "car"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Overwrite tags",
|
||||||
|
tagOverwrite: true,
|
||||||
|
expectedTags: map[string]string{"second": "apple", "third": "hi", "fourth": "plain"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
err := p.Init()
|
for _, tt := range tests {
|
||||||
require.NoError(t, err)
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
p.SetDefaultTags(map[string]string{"third": "bye", "fourth": "car"})
|
p := &Parser{
|
||||||
m, err := p.ParseLine("second=orange")
|
ColumnNames: []string{"first", "second", "third"},
|
||||||
require.ErrorIs(t, err, parsers.ErrEOF)
|
TagColumns: []string{"second", "third"},
|
||||||
require.Nil(t, m)
|
TagOverwrite: tt.tagOverwrite,
|
||||||
m, err = p.ParseLine("fourth=plain")
|
MetadataRows: 2,
|
||||||
require.NoError(t, err)
|
MetadataSeparators: []string{"="},
|
||||||
require.Nil(t, m)
|
}
|
||||||
expectedFields := []map[string]interface{}{{"first": 1.4}}
|
|
||||||
expectedTags := []map[string]string{{"second": "orange", "third": "bye", "fourth": "car"}}
|
|
||||||
|
|
||||||
m, err = p.ParseLine("1.4,apple,hi")
|
require.NoError(t, p.Init())
|
||||||
require.NoError(t, err)
|
p.SetDefaultTags(defaultTags)
|
||||||
|
|
||||||
require.Equal(t, expectedFields[0], m.Fields())
|
metrics, err := p.Parse(csv)
|
||||||
require.Equal(t, expectedTags[0], m.Tags())
|
require.NoError(t, err)
|
||||||
|
require.Len(t, metrics, 1)
|
||||||
|
require.EqualValues(t, tt.expectedTags, metrics[0].Tags())
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseCSVResetModeInvalid(t *testing.T) {
|
func TestParseCSVResetModeInvalid(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -150,6 +150,7 @@ type Config struct {
|
||||||
CSVSkipColumns int `toml:"csv_skip_columns"`
|
CSVSkipColumns int `toml:"csv_skip_columns"`
|
||||||
CSVSkipRows int `toml:"csv_skip_rows"`
|
CSVSkipRows int `toml:"csv_skip_rows"`
|
||||||
CSVTagColumns []string `toml:"csv_tag_columns"`
|
CSVTagColumns []string `toml:"csv_tag_columns"`
|
||||||
|
CSVTagOverwrite bool `toml:"csv_tag_overwrite"`
|
||||||
CSVTimestampColumn string `toml:"csv_timestamp_column"`
|
CSVTimestampColumn string `toml:"csv_timestamp_column"`
|
||||||
CSVTimestampFormat string `toml:"csv_timestamp_format"`
|
CSVTimestampFormat string `toml:"csv_timestamp_format"`
|
||||||
CSVTimezone string `toml:"csv_timezone"`
|
CSVTimezone string `toml:"csv_timezone"`
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue