Allow overriding the collection_jitter and precision per input (#7762)

This commit is contained in:
Daniel Nelson 2020-06-30 23:15:11 -07:00 committed by GitHub
parent 81ec33c560
commit 0fbe0b3968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 95 deletions

View File

@ -246,12 +246,20 @@ func (a *Agent) startInputs(
for _, input := range inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not subject to timestamp rounding.
// Service input plugins are not normally subject to timestamp
// rounding except for when precision is set on the input plugin.
//
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
// precision and interval agent/plugin settings.
var interval time.Duration
var precision time.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}
acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)
acc.SetPrecision(getPrecision(precision, interval))
err := si.Start(acc)
if err != nil {
@ -276,14 +284,24 @@ func (a *Agent) runInputs(
) error {
var wg sync.WaitGroup
for _, input := range unit.inputs {
interval := a.Config.Agent.Interval.Duration
jitter := a.Config.Agent.CollectionJitter.Duration
// Overwrite agent interval if this plugin has its own.
interval := a.Config.Agent.Interval.Duration
if input.Config.Interval != 0 {
interval = input.Config.Interval
}
// Overwrite agent precision if this plugin has its own.
precision := a.Config.Agent.Precision.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}
// Overwrite agent collection_jitter if this plugin has its own.
jitter := a.Config.Agent.CollectionJitter.Duration
if input.Config.CollectionJitter != 0 {
jitter = input.Config.CollectionJitter
}
var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
@ -293,7 +311,7 @@ func (a *Agent) runInputs(
defer ticker.Stop()
acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))
wg.Add(1)
go func(input *models.RunningInput) {
@ -368,12 +386,24 @@ func (a *Agent) testRunInputs(
go func(input *models.RunningInput) {
defer wg.Done()
// Overwrite agent interval if this plugin has its own.
interval := a.Config.Agent.Interval.Duration
if input.Config.Interval != 0 {
interval = input.Config.Interval
}
// Overwrite agent precision if this plugin has its own.
precision := a.Config.Agent.Precision.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}
// Run plugins that require multiple gathers to calculate rate
// and delta metrics twice.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(a.Precision())
nulAcc.SetPrecision(getPrecision(precision, interval))
if err := input.Input.Gather(nulAcc); err != nil {
nulAcc.AddError(err)
}
@ -382,7 +412,7 @@ func (a *Agent) testRunInputs(
}
acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
@ -580,8 +610,11 @@ func (a *Agent) runAggregators(
go func(agg *models.RunningAggregator) {
defer wg.Done()
interval := a.Config.Agent.Interval.Duration
precision := a.Config.Agent.Precision.Duration
acc := NewAccumulator(agg, unit.aggC)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))
a.push(ctx, agg, acc)
}(agg)
}
@ -705,8 +738,8 @@ func (a *Agent) runOutputs(
jitter := jitter
// Overwrite agent flush_jitter if this plugin has its own.
if output.Config.FlushJitter != nil {
jitter = *output.Config.FlushJitter
if output.Config.FlushJitter != 0 {
jitter = output.Config.FlushJitter
}
wg.Add(1)
@ -1063,10 +1096,7 @@ func (a *Agent) once(ctx context.Context, wait time.Duration) error {
}
// Returns the rounding precision for metrics.
func (a *Agent) Precision() time.Duration {
precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration
func getPrecision(precision, interval time.Duration) time.Duration {
if precision > 0 {
return precision
}

View File

@ -1075,44 +1075,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
Grace: time.Second * 0,
}
if node, ok := tbl.Fields["period"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
conf.Period = dur
}
}
if err := getConfigDuration(tbl, "period", &conf.Period); err != nil {
return nil, err
}
if node, ok := tbl.Fields["delay"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
conf.Delay = dur
}
}
if err := getConfigDuration(tbl, "delay", &conf.Delay); err != nil {
return nil, err
}
if node, ok := tbl.Fields["grace"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
conf.Grace = dur
}
}
if err := getConfigDuration(tbl, "grace", &conf.Grace); err != nil {
return nil, err
}
if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
@ -1166,9 +1140,6 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}
delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "grace")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
@ -1361,17 +1332,17 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
// models.InputConfig to be inserted into models.RunningInput
func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
cp := &models.InputConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
cp.Interval = dur
}
}
if err := getConfigDuration(tbl, "interval", &cp.Interval); err != nil {
return nil, err
}
if err := getConfigDuration(tbl, "precision", &cp.Precision); err != nil {
return nil, err
}
if err := getConfigDuration(tbl, "collection_jitter", &cp.CollectionJitter); err != nil {
return nil, err
}
if node, ok := tbl.Fields["name_prefix"]; ok {
@ -1419,7 +1390,6 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "alias")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
var err error
cp.Filter, err = buildFilter(tbl)
@ -2141,30 +2111,12 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
oc.Filter.NamePass = oc.Filter.FieldPass
}
if node, ok := tbl.Fields["flush_interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
oc.FlushInterval = dur
}
}
if err := getConfigDuration(tbl, "flush_interval", &oc.FlushInterval); err != nil {
return nil, err
}
if node, ok := tbl.Fields["flush_jitter"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
oc.FlushJitter = new(time.Duration)
*oc.FlushJitter = dur
}
}
if err := getConfigDuration(tbl, "flush_jitter", &oc.FlushJitter); err != nil {
return nil, err
}
if node, ok := tbl.Fields["metric_buffer_limit"]; ok {
@ -2223,8 +2175,6 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
}
}
delete(tbl.Fields, "flush_interval")
delete(tbl.Fields, "flush_jitter")
delete(tbl.Fields, "metric_buffer_limit")
delete(tbl.Fields, "metric_batch_size")
delete(tbl.Fields, "alias")
@ -2241,3 +2191,19 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
type unwrappable interface {
Unwrap() telegraf.Processor
}
func getConfigDuration(tbl *ast.Table, key string, target *time.Duration) error {
if node, ok := tbl.Fields[key]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
d, err := time.ParseDuration(str.Value)
if err != nil {
return err
}
delete(tbl.Fields, key)
*target = d
}
}
}
return nil
}

View File

@ -132,7 +132,6 @@ The agent table configures Telegraf and the defaults used across all plugins.
running a large number of telegraf instances. ie, a jitter of 5s and interval
10s means flushes will happen every 10-15s.
- **precision**:
Collected metrics are rounded to the precision specified as an [interval][].
@ -194,13 +193,32 @@ driven operation.
Parameters that can be used with any input plugin:
- **alias**: Name an instance of a plugin.
- **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular input should be run less or more
often, you can configure that here.
- **interval**:
Overrides the `interval` setting of the [agent][Agent] for the plugin. How
often to gather this metric. Normal plugins use a single global interval, but
if one particular input should be run less or more often, you can configure
that here.
- **precision**:
Overrides the `precision` setting of the [agent][Agent] for the plugin.
Collected metrics are rounded to the precision specified as an [interval][].
When this value is set on a service input, multiple events occuring at the
same timestamp may be merged by the output database.
- **collection_jitter**:
Overrides the `collection_jitter` setting of the [agent][Agent] for the
plugin. Collection jitter is used to jitter the collection by a random
[interval][].
- **name_override**: Override the base name of the measurement. (Default is
the name of the input).
- **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name.
- **tags**: A map of tags to apply to a specific input's measurements.
The [metric filtering][] parameters can be used to limit what metrics are

View File

@ -56,9 +56,11 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
// InputConfig is the common config for all inputs.
type InputConfig struct {
Name string
Alias string
Interval time.Duration
Name string
Alias string
Interval time.Duration
CollectionJitter time.Duration
Precision time.Duration
NameOverride string
MeasurementPrefix string

View File

@ -24,7 +24,7 @@ type OutputConfig struct {
Filter Filter
FlushInterval time.Duration
FlushJitter *time.Duration
FlushJitter time.Duration
MetricBufferLimit int
MetricBatchSize int