fix(processors.template): Handle tracking metrics correctly (#13947)
This commit is contained in:
parent
2425ebc2d9
commit
c65340ac7a
|
|
@ -126,7 +126,7 @@ type Metric interface {
|
|||
|
||||
// TemplateMetric is an interface to use in templates (e.g text/template)
|
||||
// to generate complex strings from metric properties
|
||||
// e.g. '{{.Neasurement}}-{{.Tag "foo"}}-{{.Field "bar"}}'
|
||||
// e.g. '{{.Name}}-{{.Tag "foo"}}-{{.Field "bar"}}'
|
||||
type TemplateMetric interface {
|
||||
Name() string
|
||||
Field(key string) interface{}
|
||||
|
|
@ -136,3 +136,9 @@ type TemplateMetric interface {
|
|||
Time() time.Time
|
||||
String() string
|
||||
}
|
||||
|
||||
type UnwrappableMetric interface {
|
||||
// Unwrap allows to access the underlying raw metric if an implementation
|
||||
// wraps it in the first place.
|
||||
Unwrap() Metric
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,8 +17,6 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
)
|
||||
|
||||
type unwrappableMetric interface{ Unwrap() telegraf.Metric }
|
||||
|
||||
//go:embed sample.conf
|
||||
var sampleConfig string
|
||||
|
||||
|
|
@ -68,7 +66,7 @@ func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
|||
out := make([]telegraf.Metric, 0, len(in))
|
||||
for _, raw := range in {
|
||||
m := raw
|
||||
if wm, ok := raw.(unwrappableMetric); ok {
|
||||
if wm, ok := raw.(telegraf.UnwrappableMetric); ok {
|
||||
m = wm.Unwrap()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,13 +29,17 @@ func (*TemplateProcessor) SampleConfig() string {
|
|||
|
||||
func (r *TemplateProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||
// for each metric in "in" array
|
||||
for _, metric := range in {
|
||||
m, ok := metric.(telegraf.TemplateMetric)
|
||||
for _, raw := range in {
|
||||
m := raw
|
||||
if wm, ok := raw.(telegraf.UnwrappableMetric); ok {
|
||||
m = wm.Unwrap()
|
||||
}
|
||||
tm, ok := m.(telegraf.TemplateMetric)
|
||||
if !ok {
|
||||
r.Log.Errorf("metric of type %T is not a template metric", metric)
|
||||
r.Log.Errorf("metric of type %T is not a template metric", raw)
|
||||
continue
|
||||
}
|
||||
newM := TemplateMetric{m}
|
||||
newM := TemplateMetric{tm}
|
||||
|
||||
var b strings.Builder
|
||||
if err := r.tmplTag.Execute(&b, &newM); err != nil {
|
||||
|
|
@ -51,8 +55,9 @@ func (r *TemplateProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
|||
}
|
||||
value := b.String()
|
||||
|
||||
metric.AddTag(tag, value)
|
||||
raw.AddTag(tag, value)
|
||||
}
|
||||
|
||||
return in
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,12 +1,14 @@
|
|||
package template
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
|
|
@ -263,3 +265,39 @@ func TestDot(t *testing.T) {
|
|||
expected.AddTag("metric", "test1 map[tag1:value1] map[value:1.23] 1257894000000000000")
|
||||
testutil.RequireMetricsEqual(t, []telegraf.Metric{expected}, actual)
|
||||
}
|
||||
|
||||
func TestTracking(t *testing.T) {
|
||||
// Create a tracking metric and tap the delivery information
|
||||
var mu sync.Mutex
|
||||
delivered := make([]telegraf.DeliveryInfo, 0, 1)
|
||||
notify := func(di telegraf.DeliveryInfo) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
delivered = append(delivered, di)
|
||||
}
|
||||
m := testutil.TestMetric(1.23)
|
||||
input, _ := metric.WithTracking(m, notify)
|
||||
|
||||
// Create an expectation
|
||||
e := m.Copy()
|
||||
e.AddTag("metric", "test1 map[tag1:value1] map[value:1.23] 1257894000000000000")
|
||||
expected := []telegraf.Metric{e}
|
||||
|
||||
// Configure the plugin
|
||||
plugin := TemplateProcessor{Tag: "metric", Template: "{{.}}"}
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Process expected metrics and compare with resulting metrics
|
||||
actual := plugin.Apply(input)
|
||||
testutil.RequireMetricsEqual(t, expected, actual)
|
||||
|
||||
// Simulate output acknowledging delivery
|
||||
input.Accept()
|
||||
|
||||
// Check delivery
|
||||
require.Eventuallyf(t, func() bool {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return len(delivered) > 0
|
||||
}, time.Second, 100*time.Millisecond, "%d delivered but 1 expected", len(delivered))
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue