diff --git a/plugins/processors/date/date.go b/plugins/processors/date/date.go index 75559537d..3f29182d2 100644 --- a/plugins/processors/date/date.go +++ b/plugins/processors/date/date.go @@ -33,13 +33,13 @@ func (*Date) SampleConfig() string { func (d *Date) Init() error { // Check either TagKey or FieldKey specified if len(d.FieldKey) > 0 && len(d.TagKey) > 0 { - return errors.New("Only one of field_key or tag_key can be specified") + return errors.New("field_key and tag_key cannot be specified at the same time") } else if len(d.FieldKey) == 0 && len(d.TagKey) == 0 { - return errors.New("One of field_key or tag_key must be specified") + return errors.New("at least one of field_key or tag_key must be specified") } - var err error // LoadLocation returns UTC if timezone is the empty string. + var err error d.location, err = time.LoadLocation(d.Timezone) return err } diff --git a/plugins/processors/date/date_test.go b/plugins/processors/date/date_test.go index 83df86b3e..e483a0e6d 100644 --- a/plugins/processors/date/date_test.go +++ b/plugins/processors/date/date_test.go @@ -1,6 +1,7 @@ package date import ( + "sync" "testing" "time" @@ -12,153 +13,198 @@ import ( "github.com/influxdata/telegraf/testutil" ) -func MustMetric(name string, tags map[string]string, fields map[string]interface{}, metricTime time.Time) telegraf.Metric { - if tags == nil { - tags = map[string]string{} - } - if fields == nil { - fields = map[string]interface{}{} - } - m := metric.New(name, tags, fields, metricTime) - return m -} - func TestTagAndField(t *testing.T) { - dateFormatTagAndField := Date{ + plugin := &Date{ TagKey: "month", FieldKey: "month", } - err := dateFormatTagAndField.Init() - require.Error(t, err) + require.Error(t, plugin.Init()) } func TestNoOutputSpecified(t *testing.T) { - dateFormatNoOutput := Date{} - err := dateFormatNoOutput.Init() - require.Error(t, err) + plugin := &Date{} + require.Error(t, plugin.Init()) } func TestMonthTag(t *testing.T) { - dateFormatMonth := Date{ + now := time.Now() + month := now.Format("Jan") + + input := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + expected := []telegraf.Metric{ + metric.New("foo", map[string]string{"month": month}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{"month": month}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{"month": month}, map[string]interface{}{"value": 42}, now), + } + + plugin := &Date{ TagKey: "month", DateFormat: "Jan", } - err := dateFormatMonth.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) - currentTime := time.Now() - month := currentTime.Format("Jan") - - m1 := MustMetric("foo", nil, nil, currentTime) - m2 := MustMetric("bar", nil, nil, currentTime) - m3 := MustMetric("baz", nil, nil, currentTime) - monthApply := dateFormatMonth.Apply(m1, m2, m3) - require.Equal(t, map[string]string{"month": month}, monthApply[0].Tags(), "should add tag 'month'") - require.Equal(t, map[string]string{"month": month}, monthApply[1].Tags(), "should add tag 'month'") - require.Equal(t, map[string]string{"month": month}, monthApply[2].Tags(), "should add tag 'month'") + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) } func TestMonthField(t *testing.T) { - dateFormatMonth := Date{ + now := time.Now() + month := now.Format("Jan") + + input := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + expected := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "month": month}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "month": month}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "month": month}, now), + } + + plugin := &Date{ FieldKey: "month", DateFormat: "Jan", } + require.NoError(t, plugin.Init()) - err := dateFormatMonth.Init() - require.NoError(t, err) - - currentTime := time.Now() - month := currentTime.Format("Jan") - - m1 := MustMetric("foo", nil, nil, currentTime) - m2 := MustMetric("bar", nil, nil, currentTime) - m3 := MustMetric("baz", nil, nil, currentTime) - monthApply := dateFormatMonth.Apply(m1, m2, m3) - require.Equal(t, map[string]interface{}{"month": month}, monthApply[0].Fields(), "should add field 'month'") - require.Equal(t, map[string]interface{}{"month": month}, monthApply[1].Fields(), "should add field 'month'") - require.Equal(t, map[string]interface{}{"month": month}, monthApply[2].Fields(), "should add field 'month'") + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) } func TestOldDateTag(t *testing.T) { - dateFormatYear := Date{ + now := time.Date(1993, 05, 27, 0, 0, 0, 0, time.UTC) + + input := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + expected := []telegraf.Metric{ + metric.New("foo", map[string]string{"year": "1993"}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{"year": "1993"}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{"year": "1993"}, map[string]interface{}{"value": 42}, now), + } + + plugin := &Date{ TagKey: "year", DateFormat: "2006", } + require.NoError(t, plugin.Init()) - err := dateFormatYear.Init() - require.NoError(t, err) - - m7 := MustMetric("foo", nil, nil, time.Date(1993, 05, 27, 0, 0, 0, 0, time.UTC)) - customDateApply := dateFormatYear.Apply(m7) - require.Equal(t, map[string]string{"year": "1993"}, customDateApply[0].Tags(), "should add tag 'year'") + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) } func TestFieldUnix(t *testing.T) { - dateFormatUnix := Date{ + now := time.Now() + ts := now.Unix() + + input := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + expected := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix": ts}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix": ts}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix": ts}, now), + } + + plugin := &Date{ FieldKey: "unix", DateFormat: "unix", } + require.NoError(t, plugin.Init()) - err := dateFormatUnix.Init() - require.NoError(t, err) - - currentTime := time.Now() - unixTime := currentTime.Unix() - - m8 := MustMetric("foo", nil, nil, currentTime) - unixApply := dateFormatUnix.Apply(m8) - require.Equal(t, map[string]interface{}{"unix": unixTime}, unixApply[0].Fields(), "should add unix time in s as field 'unix'") + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) } func TestFieldUnixNano(t *testing.T) { - dateFormatUnixNano := Date{ + now := time.Now() + ts := now.UnixNano() + + input := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + expected := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix_ns": ts}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix_ns": ts}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix_ns": ts}, now), + } + + plugin := &Date{ FieldKey: "unix_ns", DateFormat: "unix_ns", } + require.NoError(t, plugin.Init()) - err := dateFormatUnixNano.Init() - require.NoError(t, err) - - currentTime := time.Now() - unixNanoTime := currentTime.UnixNano() - - m9 := MustMetric("foo", nil, nil, currentTime) - unixNanoApply := dateFormatUnixNano.Apply(m9) - require.Equal(t, map[string]interface{}{"unix_ns": unixNanoTime}, unixNanoApply[0].Fields(), "should add unix time in ns as field 'unix_ns'") + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) } func TestFieldUnixMillis(t *testing.T) { - dateFormatUnixMillis := Date{ + now := time.Now() + ts := now.UnixMilli() + + input := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + expected := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix_ms": ts}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix_ms": ts}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix_ms": ts}, now), + } + + plugin := &Date{ FieldKey: "unix_ms", DateFormat: "unix_ms", } + require.NoError(t, plugin.Init()) - err := dateFormatUnixMillis.Init() - require.NoError(t, err) - - currentTime := time.Now() - unixMillisTime := currentTime.UnixNano() / 1000000 - - m10 := MustMetric("foo", nil, nil, currentTime) - unixMillisApply := dateFormatUnixMillis.Apply(m10) - require.Equal(t, map[string]interface{}{"unix_ms": unixMillisTime}, unixMillisApply[0].Fields(), "should add unix time in ms as field 'unix_ms'") + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) } func TestFieldUnixMicros(t *testing.T) { - dateFormatUnixMicros := Date{ + now := time.Now() + ts := now.UnixMicro() + + input := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + expected := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix_us": ts}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix_us": ts}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix_us": ts}, now), + } + + plugin := &Date{ FieldKey: "unix_us", DateFormat: "unix_us", } + require.NoError(t, plugin.Init()) - err := dateFormatUnixMicros.Init() - require.NoError(t, err) - - currentTime := time.Now() - unixMicrosTime := currentTime.UnixNano() / 1000 - - m11 := MustMetric("foo", nil, nil, currentTime) - unixMicrosApply := dateFormatUnixMicros.Apply(m11) - require.Equal(t, map[string]interface{}{"unix_us": unixMicrosTime}, unixMicrosApply[0].Fields(), "should add unix time in us as field 'unix_us'") + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) } func TestDateOffset(t *testing.T) { @@ -167,11 +213,9 @@ func TestDateOffset(t *testing.T) { DateFormat: "15", DateOffset: config.Duration(2 * time.Hour), } + require.NoError(t, plugin.Init()) - err := plugin.Init() - require.NoError(t, err) - - m := testutil.MustMetric( + input := testutil.MustMetric( "cpu", map[string]string{}, map[string]interface{}{ @@ -193,6 +237,58 @@ func TestDateOffset(t *testing.T) { ), } - actual := plugin.Apply(m) + actual := plugin.Apply(input) testutil.RequireMetricsEqual(t, expected, actual) } + +func TestTracking(t *testing.T) { + now := time.Now() + ts := now.UnixMicro() + + inputRaw := []telegraf.Metric{ + metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now), + metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now), + } + + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw)) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + + input := make([]telegraf.Metric, 0, len(inputRaw)) + expected := make([]telegraf.Metric, 0, len(input)) + for _, m := range inputRaw { + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + + em := m.Copy() + em.AddField("unix_us", ts) + expected = append(expected, m) + } + + plugin := &Date{ + FieldKey: "unix_us", + DateFormat: "unix_us", + } + require.NoError(t, plugin.Init()) + + // Process expected metrics and compare with resulting metrics + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) + + // Simulate output acknowledging delivery + for _, m := range actual { + m.Accept() + } + + // Check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(expected) == len(delivered) + }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) +}