diff --git a/config/config.go b/config/config.go index 2b1cf117c..008316fc5 100644 --- a/config/config.go +++ b/config/config.go @@ -1488,6 +1488,8 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e cp.CollectionJitter, _ = c.getFieldDuration(tbl, "collection_jitter") cp.CollectionOffset, _ = c.getFieldDuration(tbl, "collection_offset") cp.StartupErrorBehavior = c.getFieldString(tbl, "startup_error_behavior") + cp.TimeSource = c.getFieldString(tbl, "time_source") + cp.MeasurementPrefix = c.getFieldString(tbl, "name_prefix") cp.MeasurementSuffix = c.getFieldString(tbl, "name_suffix") cp.NameOverride = c.getFieldString(tbl, "name_override") diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 313c6fa91..c87c6e233 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -396,6 +396,14 @@ Parameters that can be used with any input plugin: When this value is set on a service input, multiple events occurring at the same timestamp may be merged by the output database. +- **time_source**: + Specifies the source of the timestamp on metrics. Possible values are: + - `metric` will not alter the metric (default) + - `collection_start` sets the timestamp to when collection started + - `collection_end` set the timestamp to when collection finished + + `time_source` will NOT be used for service inputs. It is up to each individual + service input to set the timestamp. - **collection_jitter**: Overrides the `collection_jitter` setting of the [agent][Agent] for the plugin. Collection jitter is used to jitter the collection by a random diff --git a/models/running_input.go b/models/running_input.go index 8a5ff0607..7858d0e3e 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -24,9 +24,11 @@ type RunningInput struct { log telegraf.Logger defaultTags map[string]string - startAcc telegraf.Accumulator - started bool - retries uint64 + startAcc telegraf.Accumulator + started bool + retries uint64 + gatherStart time.Time + gatherEnd time.Time MetricsGathered selfstat.Stat GatherTime selfstat.Stat @@ -87,6 +89,7 @@ type InputConfig struct { CollectionJitter time.Duration CollectionOffset time.Duration Precision time.Duration + TimeSource string StartupErrorBehavior string LogLevel string @@ -114,6 +117,14 @@ func (r *RunningInput) Init() error { return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior) } + switch r.Config.TimeSource { + case "": + r.Config.TimeSource = "metric" + case "metric", "collection_start", "collection_end": + default: + return fmt.Errorf("invalid 'time_source' setting %q", r.Config.TimeSource) + } + if p, ok := r.Input.(telegraf.Initializer); ok { return p.Init() } @@ -206,6 +217,14 @@ func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric { makemetric(metric, "", "", "", local, global) } + switch r.Config.TimeSource { + case "collection_start": + metric.SetTime(r.gatherStart) + case "collection_end": + metric.SetTime(r.gatherEnd) + default: + } + r.MetricsGathered.Incr(1) GlobalMetricsGathered.Incr(1) return metric @@ -228,10 +247,11 @@ func (r *RunningInput) Gather(acc telegraf.Accumulator) error { } } - start := time.Now() + r.gatherStart = time.Now() err := r.Input.Gather(acc) - elapsed := time.Since(start) - r.GatherTime.Incr(elapsed.Nanoseconds()) + r.gatherEnd = time.Now() + + r.GatherTime.Incr(r.gatherEnd.Sub(r.gatherStart).Nanoseconds()) return err } diff --git a/models/running_input_test.go b/models/running_input_test.go index 877cc2bd3..9bf7d6d9e 100644 --- a/models/running_input_test.go +++ b/models/running_input_test.go @@ -428,6 +428,65 @@ func TestMakeMetricWithAlwaysKeepingPluginTagsEnabled(t *testing.T) { require.Equal(t, expected, actual) } +func TestMakeMetricWithGatherMetricTimeSource(t *testing.T) { + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: make(map[string]string), + Filter: Filter{}, + AlwaysIncludeLocalTags: false, + AlwaysIncludeGlobalTags: false, + TimeSource: "metric", + }) + start := time.Now() + ri.gatherStart = start + ri.gatherEnd = start.Add(time.Second) + + expected := testutil.MockMetrics()[0] + + m := testutil.MockMetrics()[0] + actual := ri.MakeMetric(m) + + require.Equal(t, expected, actual) +} + +func TestMakeMetricWithGatherStartTimeSource(t *testing.T) { + start := time.Now() + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + Tags: make(map[string]string), + Filter: Filter{}, + AlwaysIncludeLocalTags: false, + AlwaysIncludeGlobalTags: false, + TimeSource: "collection_start", + }) + ri.gatherStart = start + + expected := testutil.MockMetrics()[0] + expected.SetTime(start) + + m := testutil.MockMetrics()[0] + actual := ri.MakeMetric(m) + + require.Equal(t, expected, actual) +} + +func TestMakeMetricWithGatherEndTimeSource(t *testing.T) { + end := time.Now() + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestRunningInput", + TimeSource: "collection_end", + }) + ri.gatherEnd = end + + expected := testutil.MockMetrics()[0] + expected.SetTime(end) + + m := testutil.MockMetrics()[0] + actual := ri.MakeMetric(m) + + require.Equal(t, expected, actual) +} + type testInput struct{} func (t *testInput) Description() string { return "" }