feat(inputs): Add option to choose the metric time source (#15917)
This commit is contained in:
parent
b029889212
commit
56f2d6e1bb
|
|
@ -1488,6 +1488,8 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
|
||||||
cp.CollectionJitter, _ = c.getFieldDuration(tbl, "collection_jitter")
|
cp.CollectionJitter, _ = c.getFieldDuration(tbl, "collection_jitter")
|
||||||
cp.CollectionOffset, _ = c.getFieldDuration(tbl, "collection_offset")
|
cp.CollectionOffset, _ = c.getFieldDuration(tbl, "collection_offset")
|
||||||
cp.StartupErrorBehavior = c.getFieldString(tbl, "startup_error_behavior")
|
cp.StartupErrorBehavior = c.getFieldString(tbl, "startup_error_behavior")
|
||||||
|
cp.TimeSource = c.getFieldString(tbl, "time_source")
|
||||||
|
|
||||||
cp.MeasurementPrefix = c.getFieldString(tbl, "name_prefix")
|
cp.MeasurementPrefix = c.getFieldString(tbl, "name_prefix")
|
||||||
cp.MeasurementSuffix = c.getFieldString(tbl, "name_suffix")
|
cp.MeasurementSuffix = c.getFieldString(tbl, "name_suffix")
|
||||||
cp.NameOverride = c.getFieldString(tbl, "name_override")
|
cp.NameOverride = c.getFieldString(tbl, "name_override")
|
||||||
|
|
|
||||||
|
|
@ -396,6 +396,14 @@ Parameters that can be used with any input plugin:
|
||||||
|
|
||||||
When this value is set on a service input, multiple events occurring at the
|
When this value is set on a service input, multiple events occurring at the
|
||||||
same timestamp may be merged by the output database.
|
same timestamp may be merged by the output database.
|
||||||
|
- **time_source**:
|
||||||
|
Specifies the source of the timestamp on metrics. Possible values are:
|
||||||
|
- `metric` will not alter the metric (default)
|
||||||
|
- `collection_start` sets the timestamp to when collection started
|
||||||
|
- `collection_end` set the timestamp to when collection finished
|
||||||
|
|
||||||
|
`time_source` will NOT be used for service inputs. It is up to each individual
|
||||||
|
service input to set the timestamp.
|
||||||
- **collection_jitter**:
|
- **collection_jitter**:
|
||||||
Overrides the `collection_jitter` setting of the [agent][Agent] for the
|
Overrides the `collection_jitter` setting of the [agent][Agent] for the
|
||||||
plugin. Collection jitter is used to jitter the collection by a random
|
plugin. Collection jitter is used to jitter the collection by a random
|
||||||
|
|
|
||||||
|
|
@ -24,9 +24,11 @@ type RunningInput struct {
|
||||||
log telegraf.Logger
|
log telegraf.Logger
|
||||||
defaultTags map[string]string
|
defaultTags map[string]string
|
||||||
|
|
||||||
startAcc telegraf.Accumulator
|
startAcc telegraf.Accumulator
|
||||||
started bool
|
started bool
|
||||||
retries uint64
|
retries uint64
|
||||||
|
gatherStart time.Time
|
||||||
|
gatherEnd time.Time
|
||||||
|
|
||||||
MetricsGathered selfstat.Stat
|
MetricsGathered selfstat.Stat
|
||||||
GatherTime selfstat.Stat
|
GatherTime selfstat.Stat
|
||||||
|
|
@ -87,6 +89,7 @@ type InputConfig struct {
|
||||||
CollectionJitter time.Duration
|
CollectionJitter time.Duration
|
||||||
CollectionOffset time.Duration
|
CollectionOffset time.Duration
|
||||||
Precision time.Duration
|
Precision time.Duration
|
||||||
|
TimeSource string
|
||||||
StartupErrorBehavior string
|
StartupErrorBehavior string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
|
|
||||||
|
|
@ -114,6 +117,14 @@ func (r *RunningInput) Init() error {
|
||||||
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
|
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch r.Config.TimeSource {
|
||||||
|
case "":
|
||||||
|
r.Config.TimeSource = "metric"
|
||||||
|
case "metric", "collection_start", "collection_end":
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid 'time_source' setting %q", r.Config.TimeSource)
|
||||||
|
}
|
||||||
|
|
||||||
if p, ok := r.Input.(telegraf.Initializer); ok {
|
if p, ok := r.Input.(telegraf.Initializer); ok {
|
||||||
return p.Init()
|
return p.Init()
|
||||||
}
|
}
|
||||||
|
|
@ -206,6 +217,14 @@ func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
|
||||||
makemetric(metric, "", "", "", local, global)
|
makemetric(metric, "", "", "", local, global)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch r.Config.TimeSource {
|
||||||
|
case "collection_start":
|
||||||
|
metric.SetTime(r.gatherStart)
|
||||||
|
case "collection_end":
|
||||||
|
metric.SetTime(r.gatherEnd)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
r.MetricsGathered.Incr(1)
|
r.MetricsGathered.Incr(1)
|
||||||
GlobalMetricsGathered.Incr(1)
|
GlobalMetricsGathered.Incr(1)
|
||||||
return metric
|
return metric
|
||||||
|
|
@ -228,10 +247,11 @@ func (r *RunningInput) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
r.gatherStart = time.Now()
|
||||||
err := r.Input.Gather(acc)
|
err := r.Input.Gather(acc)
|
||||||
elapsed := time.Since(start)
|
r.gatherEnd = time.Now()
|
||||||
r.GatherTime.Incr(elapsed.Nanoseconds())
|
|
||||||
|
r.GatherTime.Incr(r.gatherEnd.Sub(r.gatherStart).Nanoseconds())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -428,6 +428,65 @@ func TestMakeMetricWithAlwaysKeepingPluginTagsEnabled(t *testing.T) {
|
||||||
require.Equal(t, expected, actual)
|
require.Equal(t, expected, actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMakeMetricWithGatherMetricTimeSource(t *testing.T) {
|
||||||
|
ri := NewRunningInput(&testInput{}, &InputConfig{
|
||||||
|
Name: "TestRunningInput",
|
||||||
|
Tags: make(map[string]string),
|
||||||
|
Filter: Filter{},
|
||||||
|
AlwaysIncludeLocalTags: false,
|
||||||
|
AlwaysIncludeGlobalTags: false,
|
||||||
|
TimeSource: "metric",
|
||||||
|
})
|
||||||
|
start := time.Now()
|
||||||
|
ri.gatherStart = start
|
||||||
|
ri.gatherEnd = start.Add(time.Second)
|
||||||
|
|
||||||
|
expected := testutil.MockMetrics()[0]
|
||||||
|
|
||||||
|
m := testutil.MockMetrics()[0]
|
||||||
|
actual := ri.MakeMetric(m)
|
||||||
|
|
||||||
|
require.Equal(t, expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMakeMetricWithGatherStartTimeSource(t *testing.T) {
|
||||||
|
start := time.Now()
|
||||||
|
ri := NewRunningInput(&testInput{}, &InputConfig{
|
||||||
|
Name: "TestRunningInput",
|
||||||
|
Tags: make(map[string]string),
|
||||||
|
Filter: Filter{},
|
||||||
|
AlwaysIncludeLocalTags: false,
|
||||||
|
AlwaysIncludeGlobalTags: false,
|
||||||
|
TimeSource: "collection_start",
|
||||||
|
})
|
||||||
|
ri.gatherStart = start
|
||||||
|
|
||||||
|
expected := testutil.MockMetrics()[0]
|
||||||
|
expected.SetTime(start)
|
||||||
|
|
||||||
|
m := testutil.MockMetrics()[0]
|
||||||
|
actual := ri.MakeMetric(m)
|
||||||
|
|
||||||
|
require.Equal(t, expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMakeMetricWithGatherEndTimeSource(t *testing.T) {
|
||||||
|
end := time.Now()
|
||||||
|
ri := NewRunningInput(&testInput{}, &InputConfig{
|
||||||
|
Name: "TestRunningInput",
|
||||||
|
TimeSource: "collection_end",
|
||||||
|
})
|
||||||
|
ri.gatherEnd = end
|
||||||
|
|
||||||
|
expected := testutil.MockMetrics()[0]
|
||||||
|
expected.SetTime(end)
|
||||||
|
|
||||||
|
m := testutil.MockMetrics()[0]
|
||||||
|
actual := ri.MakeMetric(m)
|
||||||
|
|
||||||
|
require.Equal(t, expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
type testInput struct{}
|
type testInput struct{}
|
||||||
|
|
||||||
func (t *testInput) Description() string { return "" }
|
func (t *testInput) Description() string { return "" }
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue