test(processors.date): Add unit-test for tracking metrics (#14738)
This commit is contained in:
parent
a01d9624e2
commit
ba036e4464
|
|
@ -33,13 +33,13 @@ func (*Date) SampleConfig() string {
|
||||||
func (d *Date) Init() error {
|
func (d *Date) Init() error {
|
||||||
// Check either TagKey or FieldKey specified
|
// Check either TagKey or FieldKey specified
|
||||||
if len(d.FieldKey) > 0 && len(d.TagKey) > 0 {
|
if len(d.FieldKey) > 0 && len(d.TagKey) > 0 {
|
||||||
return errors.New("Only one of field_key or tag_key can be specified")
|
return errors.New("field_key and tag_key cannot be specified at the same time")
|
||||||
} else if len(d.FieldKey) == 0 && len(d.TagKey) == 0 {
|
} else if len(d.FieldKey) == 0 && len(d.TagKey) == 0 {
|
||||||
return errors.New("One of field_key or tag_key must be specified")
|
return errors.New("at least one of field_key or tag_key must be specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
// LoadLocation returns UTC if timezone is the empty string.
|
// LoadLocation returns UTC if timezone is the empty string.
|
||||||
|
var err error
|
||||||
d.location, err = time.LoadLocation(d.Timezone)
|
d.location, err = time.LoadLocation(d.Timezone)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package date
|
package date
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -12,153 +13,198 @@ import (
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func MustMetric(name string, tags map[string]string, fields map[string]interface{}, metricTime time.Time) telegraf.Metric {
|
|
||||||
if tags == nil {
|
|
||||||
tags = map[string]string{}
|
|
||||||
}
|
|
||||||
if fields == nil {
|
|
||||||
fields = map[string]interface{}{}
|
|
||||||
}
|
|
||||||
m := metric.New(name, tags, fields, metricTime)
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTagAndField(t *testing.T) {
|
func TestTagAndField(t *testing.T) {
|
||||||
dateFormatTagAndField := Date{
|
plugin := &Date{
|
||||||
TagKey: "month",
|
TagKey: "month",
|
||||||
FieldKey: "month",
|
FieldKey: "month",
|
||||||
}
|
}
|
||||||
err := dateFormatTagAndField.Init()
|
require.Error(t, plugin.Init())
|
||||||
require.Error(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoOutputSpecified(t *testing.T) {
|
func TestNoOutputSpecified(t *testing.T) {
|
||||||
dateFormatNoOutput := Date{}
|
plugin := &Date{}
|
||||||
err := dateFormatNoOutput.Init()
|
require.Error(t, plugin.Init())
|
||||||
require.Error(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMonthTag(t *testing.T) {
|
func TestMonthTag(t *testing.T) {
|
||||||
dateFormatMonth := Date{
|
now := time.Now()
|
||||||
|
month := now.Format("Jan")
|
||||||
|
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{"month": month}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{"month": month}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{"month": month}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
TagKey: "month",
|
TagKey: "month",
|
||||||
DateFormat: "Jan",
|
DateFormat: "Jan",
|
||||||
}
|
}
|
||||||
err := dateFormatMonth.Init()
|
require.NoError(t, plugin.Init())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
currentTime := time.Now()
|
actual := plugin.Apply(input...)
|
||||||
month := currentTime.Format("Jan")
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
m1 := MustMetric("foo", nil, nil, currentTime)
|
|
||||||
m2 := MustMetric("bar", nil, nil, currentTime)
|
|
||||||
m3 := MustMetric("baz", nil, nil, currentTime)
|
|
||||||
monthApply := dateFormatMonth.Apply(m1, m2, m3)
|
|
||||||
require.Equal(t, map[string]string{"month": month}, monthApply[0].Tags(), "should add tag 'month'")
|
|
||||||
require.Equal(t, map[string]string{"month": month}, monthApply[1].Tags(), "should add tag 'month'")
|
|
||||||
require.Equal(t, map[string]string{"month": month}, monthApply[2].Tags(), "should add tag 'month'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMonthField(t *testing.T) {
|
func TestMonthField(t *testing.T) {
|
||||||
dateFormatMonth := Date{
|
now := time.Now()
|
||||||
|
month := now.Format("Jan")
|
||||||
|
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "month": month}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "month": month}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "month": month}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
FieldKey: "month",
|
FieldKey: "month",
|
||||||
DateFormat: "Jan",
|
DateFormat: "Jan",
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
err := dateFormatMonth.Init()
|
actual := plugin.Apply(input...)
|
||||||
require.NoError(t, err)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
currentTime := time.Now()
|
|
||||||
month := currentTime.Format("Jan")
|
|
||||||
|
|
||||||
m1 := MustMetric("foo", nil, nil, currentTime)
|
|
||||||
m2 := MustMetric("bar", nil, nil, currentTime)
|
|
||||||
m3 := MustMetric("baz", nil, nil, currentTime)
|
|
||||||
monthApply := dateFormatMonth.Apply(m1, m2, m3)
|
|
||||||
require.Equal(t, map[string]interface{}{"month": month}, monthApply[0].Fields(), "should add field 'month'")
|
|
||||||
require.Equal(t, map[string]interface{}{"month": month}, monthApply[1].Fields(), "should add field 'month'")
|
|
||||||
require.Equal(t, map[string]interface{}{"month": month}, monthApply[2].Fields(), "should add field 'month'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOldDateTag(t *testing.T) {
|
func TestOldDateTag(t *testing.T) {
|
||||||
dateFormatYear := Date{
|
now := time.Date(1993, 05, 27, 0, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{"year": "1993"}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{"year": "1993"}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{"year": "1993"}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
TagKey: "year",
|
TagKey: "year",
|
||||||
DateFormat: "2006",
|
DateFormat: "2006",
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
err := dateFormatYear.Init()
|
actual := plugin.Apply(input...)
|
||||||
require.NoError(t, err)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
m7 := MustMetric("foo", nil, nil, time.Date(1993, 05, 27, 0, 0, 0, 0, time.UTC))
|
|
||||||
customDateApply := dateFormatYear.Apply(m7)
|
|
||||||
require.Equal(t, map[string]string{"year": "1993"}, customDateApply[0].Tags(), "should add tag 'year'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFieldUnix(t *testing.T) {
|
func TestFieldUnix(t *testing.T) {
|
||||||
dateFormatUnix := Date{
|
now := time.Now()
|
||||||
|
ts := now.Unix()
|
||||||
|
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix": ts}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix": ts}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix": ts}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
FieldKey: "unix",
|
FieldKey: "unix",
|
||||||
DateFormat: "unix",
|
DateFormat: "unix",
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
err := dateFormatUnix.Init()
|
actual := plugin.Apply(input...)
|
||||||
require.NoError(t, err)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
currentTime := time.Now()
|
|
||||||
unixTime := currentTime.Unix()
|
|
||||||
|
|
||||||
m8 := MustMetric("foo", nil, nil, currentTime)
|
|
||||||
unixApply := dateFormatUnix.Apply(m8)
|
|
||||||
require.Equal(t, map[string]interface{}{"unix": unixTime}, unixApply[0].Fields(), "should add unix time in s as field 'unix'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFieldUnixNano(t *testing.T) {
|
func TestFieldUnixNano(t *testing.T) {
|
||||||
dateFormatUnixNano := Date{
|
now := time.Now()
|
||||||
|
ts := now.UnixNano()
|
||||||
|
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix_ns": ts}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix_ns": ts}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix_ns": ts}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
FieldKey: "unix_ns",
|
FieldKey: "unix_ns",
|
||||||
DateFormat: "unix_ns",
|
DateFormat: "unix_ns",
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
err := dateFormatUnixNano.Init()
|
actual := plugin.Apply(input...)
|
||||||
require.NoError(t, err)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
currentTime := time.Now()
|
|
||||||
unixNanoTime := currentTime.UnixNano()
|
|
||||||
|
|
||||||
m9 := MustMetric("foo", nil, nil, currentTime)
|
|
||||||
unixNanoApply := dateFormatUnixNano.Apply(m9)
|
|
||||||
require.Equal(t, map[string]interface{}{"unix_ns": unixNanoTime}, unixNanoApply[0].Fields(), "should add unix time in ns as field 'unix_ns'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFieldUnixMillis(t *testing.T) {
|
func TestFieldUnixMillis(t *testing.T) {
|
||||||
dateFormatUnixMillis := Date{
|
now := time.Now()
|
||||||
|
ts := now.UnixMilli()
|
||||||
|
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix_ms": ts}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix_ms": ts}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix_ms": ts}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
FieldKey: "unix_ms",
|
FieldKey: "unix_ms",
|
||||||
DateFormat: "unix_ms",
|
DateFormat: "unix_ms",
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
err := dateFormatUnixMillis.Init()
|
actual := plugin.Apply(input...)
|
||||||
require.NoError(t, err)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
currentTime := time.Now()
|
|
||||||
unixMillisTime := currentTime.UnixNano() / 1000000
|
|
||||||
|
|
||||||
m10 := MustMetric("foo", nil, nil, currentTime)
|
|
||||||
unixMillisApply := dateFormatUnixMillis.Apply(m10)
|
|
||||||
require.Equal(t, map[string]interface{}{"unix_ms": unixMillisTime}, unixMillisApply[0].Fields(), "should add unix time in ms as field 'unix_ms'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFieldUnixMicros(t *testing.T) {
|
func TestFieldUnixMicros(t *testing.T) {
|
||||||
dateFormatUnixMicros := Date{
|
now := time.Now()
|
||||||
|
ts := now.UnixMicro()
|
||||||
|
|
||||||
|
input := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42, "unix_us": ts}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42, "unix_us": ts}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42, "unix_us": ts}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
FieldKey: "unix_us",
|
FieldKey: "unix_us",
|
||||||
DateFormat: "unix_us",
|
DateFormat: "unix_us",
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
err := dateFormatUnixMicros.Init()
|
actual := plugin.Apply(input...)
|
||||||
require.NoError(t, err)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
currentTime := time.Now()
|
|
||||||
unixMicrosTime := currentTime.UnixNano() / 1000
|
|
||||||
|
|
||||||
m11 := MustMetric("foo", nil, nil, currentTime)
|
|
||||||
unixMicrosApply := dateFormatUnixMicros.Apply(m11)
|
|
||||||
require.Equal(t, map[string]interface{}{"unix_us": unixMicrosTime}, unixMicrosApply[0].Fields(), "should add unix time in us as field 'unix_us'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDateOffset(t *testing.T) {
|
func TestDateOffset(t *testing.T) {
|
||||||
|
|
@ -167,11 +213,9 @@ func TestDateOffset(t *testing.T) {
|
||||||
DateFormat: "15",
|
DateFormat: "15",
|
||||||
DateOffset: config.Duration(2 * time.Hour),
|
DateOffset: config.Duration(2 * time.Hour),
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
err := plugin.Init()
|
input := testutil.MustMetric(
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
m := testutil.MustMetric(
|
|
||||||
"cpu",
|
"cpu",
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
|
|
@ -193,6 +237,58 @@ func TestDateOffset(t *testing.T) {
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
actual := plugin.Apply(m)
|
actual := plugin.Apply(input)
|
||||||
testutil.RequireMetricsEqual(t, expected, actual)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTracking(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
ts := now.UnixMicro()
|
||||||
|
|
||||||
|
inputRaw := []telegraf.Metric{
|
||||||
|
metric.New("foo", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("bar", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
metric.New("baz", map[string]string{}, map[string]interface{}{"value": 42}, now),
|
||||||
|
}
|
||||||
|
|
||||||
|
var mu sync.Mutex
|
||||||
|
delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
|
||||||
|
notify := func(di telegraf.DeliveryInfo) {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
delivered = append(delivered, di)
|
||||||
|
}
|
||||||
|
|
||||||
|
input := make([]telegraf.Metric, 0, len(inputRaw))
|
||||||
|
expected := make([]telegraf.Metric, 0, len(input))
|
||||||
|
for _, m := range inputRaw {
|
||||||
|
tm, _ := metric.WithTracking(m, notify)
|
||||||
|
input = append(input, tm)
|
||||||
|
|
||||||
|
em := m.Copy()
|
||||||
|
em.AddField("unix_us", ts)
|
||||||
|
expected = append(expected, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &Date{
|
||||||
|
FieldKey: "unix_us",
|
||||||
|
DateFormat: "unix_us",
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
// Process expected metrics and compare with resulting metrics
|
||||||
|
actual := plugin.Apply(input...)
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
|
// Simulate output acknowledging delivery
|
||||||
|
for _, m := range actual {
|
||||||
|
m.Accept()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check delivery
|
||||||
|
require.Eventuallyf(t, func() bool {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
return len(expected) == len(delivered)
|
||||||
|
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue