Add timezone configuration to csv data format (#7619)
This commit is contained in:
parent
23bcc8a3a4
commit
adbc425961
|
|
@ -1779,6 +1779,14 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["csv_timezone"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.CSVTimezone = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if node, ok := tbl.Fields["csv_header_row_count"]; ok {
|
if node, ok := tbl.Fields["csv_header_row_count"]; ok {
|
||||||
if kv, ok := node.(*ast.KeyValue); ok {
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
if integer, ok := kv.Value.(*ast.Integer); ok {
|
if integer, ok := kv.Value.(*ast.Integer); ok {
|
||||||
|
|
@ -1881,6 +1889,7 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
|
||||||
delete(tbl.Fields, "csv_tag_columns")
|
delete(tbl.Fields, "csv_tag_columns")
|
||||||
delete(tbl.Fields, "csv_timestamp_column")
|
delete(tbl.Fields, "csv_timestamp_column")
|
||||||
delete(tbl.Fields, "csv_timestamp_format")
|
delete(tbl.Fields, "csv_timestamp_format")
|
||||||
|
delete(tbl.Fields, "csv_timezone")
|
||||||
delete(tbl.Fields, "csv_trim_space")
|
delete(tbl.Fields, "csv_trim_space")
|
||||||
delete(tbl.Fields, "form_urlencoded_tag_keys")
|
delete(tbl.Fields, "form_urlencoded_tag_keys")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,11 @@ values.
|
||||||
## The format of time data extracted from `csv_timestamp_column`
|
## The format of time data extracted from `csv_timestamp_column`
|
||||||
## this must be specified if `csv_timestamp_column` is specified
|
## this must be specified if `csv_timestamp_column` is specified
|
||||||
csv_timestamp_format = ""
|
csv_timestamp_format = ""
|
||||||
|
|
||||||
|
## The timezone of time data extracted from `csv_timestamp_column`
|
||||||
|
## in case of there is no timezone information.
|
||||||
|
## It follows the IANA Time Zone database.
|
||||||
|
csv_timezone = ""
|
||||||
```
|
```
|
||||||
#### csv_timestamp_column, csv_timestamp_format
|
#### csv_timestamp_column, csv_timestamp_format
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ type Parser struct {
|
||||||
TimestampFormat string
|
TimestampFormat string
|
||||||
DefaultTags map[string]string
|
DefaultTags map[string]string
|
||||||
TimeFunc func() time.Time
|
TimeFunc func() time.Time
|
||||||
|
Timezone string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Parser) SetTimeFunc(fn TimeFunc) {
|
func (p *Parser) SetTimeFunc(fn TimeFunc) {
|
||||||
|
|
@ -211,7 +212,7 @@ outer:
|
||||||
measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn])
|
measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn])
|
||||||
}
|
}
|
||||||
|
|
||||||
metricTime, err := parseTimestamp(p.TimeFunc, recordFields, p.TimestampColumn, p.TimestampFormat)
|
metricTime, err := parseTimestamp(p.TimeFunc, recordFields, p.TimestampColumn, p.TimestampFormat, p.Timezone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -231,7 +232,7 @@ outer:
|
||||||
// will be the current timestamp, else it will try to parse the time according
|
// will be the current timestamp, else it will try to parse the time according
|
||||||
// to the format.
|
// to the format.
|
||||||
func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface{},
|
func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface{},
|
||||||
timestampColumn, timestampFormat string,
|
timestampColumn, timestampFormat string, Timezone string,
|
||||||
) (time.Time, error) {
|
) (time.Time, error) {
|
||||||
if timestampColumn != "" {
|
if timestampColumn != "" {
|
||||||
if recordFields[timestampColumn] == nil {
|
if recordFields[timestampColumn] == nil {
|
||||||
|
|
@ -242,7 +243,7 @@ func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface
|
||||||
case "":
|
case "":
|
||||||
return time.Time{}, fmt.Errorf("timestamp format must be specified")
|
return time.Time{}, fmt.Errorf("timestamp format must be specified")
|
||||||
default:
|
default:
|
||||||
metricTime, err := internal.ParseTimestamp(timestampFormat, recordFields[timestampColumn], "UTC")
|
metricTime, err := internal.ParseTimestamp(timestampFormat, recordFields[timestampColumn], Timezone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, err
|
return time.Time{}, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -431,3 +431,23 @@ func TestSkipTimestampColumn(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
testutil.RequireMetricsEqual(t, expected, metrics)
|
testutil.RequireMetricsEqual(t, expected, metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTimestampTimezone(t *testing.T) {
|
||||||
|
p := Parser{
|
||||||
|
HeaderRowCount: 1,
|
||||||
|
ColumnNames: []string{"first", "second", "third"},
|
||||||
|
MeasurementColumn: "third",
|
||||||
|
TimestampColumn: "first",
|
||||||
|
TimestampFormat: "02/01/06 03:04:05 PM",
|
||||||
|
TimeFunc: DefaultTime,
|
||||||
|
Timezone: "Asia/Jakarta",
|
||||||
|
}
|
||||||
|
testCSV := `line1,line2,line3
|
||||||
|
23/05/09 11:05:06 PM,70,test_name
|
||||||
|
07/11/09 11:05:06 PM,80,test_name2`
|
||||||
|
metrics, err := p.Parse([]byte(testCSV))
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000))
|
||||||
|
require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -144,6 +144,7 @@ type Config struct {
|
||||||
CSVTagColumns []string `toml:"csv_tag_columns"`
|
CSVTagColumns []string `toml:"csv_tag_columns"`
|
||||||
CSVTimestampColumn string `toml:"csv_timestamp_column"`
|
CSVTimestampColumn string `toml:"csv_timestamp_column"`
|
||||||
CSVTimestampFormat string `toml:"csv_timestamp_format"`
|
CSVTimestampFormat string `toml:"csv_timestamp_format"`
|
||||||
|
CSVTimezone string `toml:"csv_timezone"`
|
||||||
CSVTrimSpace bool `toml:"csv_trim_space"`
|
CSVTrimSpace bool `toml:"csv_trim_space"`
|
||||||
|
|
||||||
// FormData configuration
|
// FormData configuration
|
||||||
|
|
@ -218,6 +219,7 @@ func NewParser(config *Config) (Parser, error) {
|
||||||
config.CSVMeasurementColumn,
|
config.CSVMeasurementColumn,
|
||||||
config.CSVTimestampColumn,
|
config.CSVTimestampColumn,
|
||||||
config.CSVTimestampFormat,
|
config.CSVTimestampFormat,
|
||||||
|
config.CSVTimezone,
|
||||||
config.DefaultTags)
|
config.DefaultTags)
|
||||||
case "logfmt":
|
case "logfmt":
|
||||||
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
|
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
|
||||||
|
|
@ -246,6 +248,7 @@ func newCSVParser(metricName string,
|
||||||
nameColumn string,
|
nameColumn string,
|
||||||
timestampColumn string,
|
timestampColumn string,
|
||||||
timestampFormat string,
|
timestampFormat string,
|
||||||
|
timezone string,
|
||||||
defaultTags map[string]string) (Parser, error) {
|
defaultTags map[string]string) (Parser, error) {
|
||||||
|
|
||||||
if headerRowCount == 0 && len(columnNames) == 0 {
|
if headerRowCount == 0 && len(columnNames) == 0 {
|
||||||
|
|
@ -284,6 +287,7 @@ func newCSVParser(metricName string,
|
||||||
MeasurementColumn: nameColumn,
|
MeasurementColumn: nameColumn,
|
||||||
TimestampColumn: timestampColumn,
|
TimestampColumn: timestampColumn,
|
||||||
TimestampFormat: timestampFormat,
|
TimestampFormat: timestampFormat,
|
||||||
|
Timezone: timezone,
|
||||||
DefaultTags: defaultTags,
|
DefaultTags: defaultTags,
|
||||||
TimeFunc: time.Now,
|
TimeFunc: time.Now,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue