feat: collection offset implementation (#10545)

This commit is contained in:
Sven Rebhan 2022-02-15 18:39:12 +01:00 committed by GitHub
parent f75f437d37
commit 5479df2eb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 259 additions and 80 deletions

View File

@ -298,11 +298,17 @@ func (a *Agent) runInputs(
jitter = input.Config.CollectionJitter
}
// Overwrite agent collection_offset if this plugin has its own.
offset := time.Duration(a.Config.Agent.CollectionOffset)
if input.Config.CollectionOffset != 0 {
offset = input.Config.CollectionOffset
}
var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
} else {
ticker = NewUnalignedTicker(interval, jitter)
ticker = NewUnalignedTicker(interval, jitter, offset)
}
defer ticker.Stop()

View File

@ -31,36 +31,38 @@ type Ticker interface {
type AlignedTicker struct {
interval time.Duration
jitter time.Duration
offset time.Duration
minInterval time.Duration
ch chan time.Time
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewAlignedTicker(now time.Time, interval, jitter time.Duration) *AlignedTicker {
return newAlignedTicker(now, interval, jitter, clock.New())
}
func newAlignedTicker(now time.Time, interval, jitter time.Duration, clock clock.Clock) *AlignedTicker {
ctx, cancel := context.WithCancel(context.Background())
func NewAlignedTicker(now time.Time, interval, jitter, offset time.Duration) *AlignedTicker {
t := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
ch: make(chan time.Time, 1),
cancel: cancel,
}
t.start(now, clock.New())
return t
}
func (t *AlignedTicker) start(now time.Time, clk clock.Clock) {
t.ch = make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
d := t.next(now)
timer := clock.Timer(d)
timer := clk.Timer(d)
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.run(ctx, timer)
}()
return t
}
func (t *AlignedTicker) next(now time.Time) time.Duration {
@ -74,6 +76,7 @@ func (t *AlignedTicker) next(now time.Time) time.Duration {
if d == 0 {
d = t.interval
}
d += t.offset
d += internal.RandomDuration(t.jitter)
return d
}
@ -118,42 +121,48 @@ func (t *AlignedTicker) Stop() {
type UnalignedTicker struct {
interval time.Duration
jitter time.Duration
offset time.Duration
ch chan time.Time
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewUnalignedTicker(interval, jitter time.Duration) *UnalignedTicker {
return newUnalignedTicker(interval, jitter, clock.New())
}
func newUnalignedTicker(interval, jitter time.Duration, clock clock.Clock) *UnalignedTicker {
ctx, cancel := context.WithCancel(context.Background())
func NewUnalignedTicker(interval, jitter, offset time.Duration) *UnalignedTicker {
t := &UnalignedTicker{
interval: interval,
jitter: jitter,
ch: make(chan time.Time, 1),
cancel: cancel,
offset: offset,
}
t.start(clock.New())
return t
}
ticker := clock.Ticker(t.interval)
t.ch <- clock.Now()
func (t *UnalignedTicker) start(clk clock.Clock) *UnalignedTicker {
t.ch = make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
ticker := clk.Ticker(t.interval)
if t.offset == 0 {
// Perform initial trigger to stay backward compatible
t.ch <- clk.Now()
}
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.run(ctx, ticker, clock)
t.run(ctx, ticker, clk)
}()
return t
}
func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error {
func sleep(ctx context.Context, duration time.Duration, clk clock.Clock) error {
if duration == 0 {
return nil
}
t := clock.Timer(duration)
t := clk.Timer(duration)
select {
case <-t.C:
return nil
@ -163,7 +172,7 @@ func sleep(ctx context.Context, duration time.Duration, clock clock.Clock) error
}
}
func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock clock.Clock) {
func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clk clock.Clock) {
for {
select {
case <-ctx.Done():
@ -171,13 +180,13 @@ func (t *UnalignedTicker) run(ctx context.Context, ticker *clock.Ticker, clock c
return
case <-ticker.C:
jitter := internal.RandomDuration(t.jitter)
err := sleep(ctx, jitter, clock)
err := sleep(ctx, t.offset+jitter, clk)
if err != nil {
ticker.Stop()
return
}
select {
case t.ch <- clock.Now():
case t.ch <- clk.Now():
default:
}
}
@ -217,20 +226,22 @@ type RollingTicker struct {
}
func NewRollingTicker(interval, jitter time.Duration) *RollingTicker {
return newRollingTicker(interval, jitter, clock.New())
}
func newRollingTicker(interval, jitter time.Duration, clock clock.Clock) *RollingTicker {
ctx, cancel := context.WithCancel(context.Background())
t := &RollingTicker{
interval: interval,
jitter: jitter,
ch: make(chan time.Time, 1),
cancel: cancel,
}
t.start(clock.New())
return t
}
func (t *RollingTicker) start(clk clock.Clock) *RollingTicker {
t.ch = make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
d := t.next()
timer := clock.Timer(d)
timer := clk.Timer(d)
t.wg.Add(1)
go func() {

View File

@ -13,12 +13,19 @@ import (
func TestAlignedTicker(t *testing.T) {
interval := 10 * time.Second
jitter := 0 * time.Second
offset := 0 * time.Second
clock := clock.NewMock()
since := clock.Now()
clk := clock.NewMock()
since := clk.Now()
until := since.Add(60 * time.Second)
ticker := newAlignedTicker(since, interval, jitter, clock)
ticker := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
}
ticker.start(since, clk)
defer ticker.Stop()
expected := []time.Time{
@ -32,13 +39,13 @@ func TestAlignedTicker(t *testing.T) {
actual := []time.Time{}
clock.Add(10 * time.Second)
for !clock.Now().After(until) {
clk.Add(10 * time.Second)
for !clk.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
actual = append(actual, tm.UTC())
}
clock.Add(10 * time.Second)
clk.Add(10 * time.Second)
}
require.Equal(t, expected, actual)
@ -47,16 +54,23 @@ func TestAlignedTicker(t *testing.T) {
func TestAlignedTickerJitter(t *testing.T) {
interval := 10 * time.Second
jitter := 5 * time.Second
offset := 0 * time.Second
clock := clock.NewMock()
since := clock.Now()
clk := clock.NewMock()
since := clk.Now()
until := since.Add(61 * time.Second)
ticker := newAlignedTicker(since, interval, jitter, clock)
ticker := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
}
ticker.start(since, clk)
defer ticker.Stop()
last := since
for !clock.Now().After(until) {
for !clk.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
dur := tm.Sub(last)
@ -66,24 +80,69 @@ func TestAlignedTickerJitter(t *testing.T) {
last = last.Add(interval)
default:
}
clock.Add(1 * time.Second)
clk.Add(1 * time.Second)
}
}
func TestAlignedTickerOffset(t *testing.T) {
interval := 10 * time.Second
jitter := 0 * time.Second
offset := 3 * time.Second
clk := clock.NewMock()
since := clk.Now()
until := since.Add(61 * time.Second)
ticker := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
}
ticker.start(since, clk)
defer ticker.Stop()
expected := []time.Time{
time.Unix(13, 0).UTC(),
time.Unix(23, 0).UTC(),
time.Unix(33, 0).UTC(),
time.Unix(43, 0).UTC(),
time.Unix(53, 0).UTC(),
}
actual := []time.Time{}
clk.Add(10*time.Second + offset)
for !clk.Now().After(until) {
tm := <-ticker.Elapsed()
actual = append(actual, tm.UTC())
clk.Add(10 * time.Second)
}
require.Equal(t, expected, actual)
}
func TestAlignedTickerMissedTick(t *testing.T) {
interval := 10 * time.Second
jitter := 0 * time.Second
offset := 0 * time.Second
clock := clock.NewMock()
since := clock.Now()
clk := clock.NewMock()
since := clk.Now()
ticker := newAlignedTicker(since, interval, jitter, clock)
ticker := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
}
ticker.start(since, clk)
defer ticker.Stop()
clock.Add(25 * time.Second)
clk.Add(25 * time.Second)
tm := <-ticker.Elapsed()
require.Equal(t, time.Unix(10, 0).UTC(), tm.UTC())
clock.Add(5 * time.Second)
clk.Add(5 * time.Second)
tm = <-ticker.Elapsed()
require.Equal(t, time.Unix(30, 0).UTC(), tm.UTC())
}
@ -91,13 +150,19 @@ func TestAlignedTickerMissedTick(t *testing.T) {
func TestUnalignedTicker(t *testing.T) {
interval := 10 * time.Second
jitter := 0 * time.Second
offset := 0 * time.Second
clock := clock.NewMock()
clock.Add(1 * time.Second)
since := clock.Now()
clk := clock.NewMock()
clk.Add(1 * time.Second)
since := clk.Now()
until := since.Add(60 * time.Second)
ticker := newUnalignedTicker(interval, jitter, clock)
ticker := &UnalignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
}
ticker.start(clk)
defer ticker.Stop()
expected := []time.Time{
@ -111,13 +176,13 @@ func TestUnalignedTicker(t *testing.T) {
}
actual := []time.Time{}
for !clock.Now().After(until) {
for !clk.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
actual = append(actual, tm.UTC())
default:
}
clock.Add(10 * time.Second)
clk.Add(10 * time.Second)
}
require.Equal(t, expected, actual)
@ -126,13 +191,19 @@ func TestUnalignedTicker(t *testing.T) {
func TestRollingTicker(t *testing.T) {
interval := 10 * time.Second
jitter := 0 * time.Second
offset := 0 * time.Second
clock := clock.NewMock()
clock.Add(1 * time.Second)
since := clock.Now()
clk := clock.NewMock()
clk.Add(1 * time.Second)
since := clk.Now()
until := since.Add(60 * time.Second)
ticker := newUnalignedTicker(interval, jitter, clock)
ticker := &UnalignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
}
ticker.start(clk)
defer ticker.Stop()
expected := []time.Time{
@ -146,13 +217,13 @@ func TestRollingTicker(t *testing.T) {
}
actual := []time.Time{}
for !clock.Now().After(until) {
for !clk.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
actual = append(actual, tm.UTC())
default:
}
clock.Add(10 * time.Second)
clk.Add(10 * time.Second)
}
require.Equal(t, expected, actual)
@ -167,13 +238,46 @@ func TestAlignedTickerDistribution(t *testing.T) {
interval := 10 * time.Second
jitter := 5 * time.Second
offset := 0 * time.Second
clock := clock.NewMock()
since := clock.Now()
clk := clock.NewMock()
since := clk.Now()
ticker := newAlignedTicker(since, interval, jitter, clock)
ticker := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
}
ticker.start(since, clk)
defer ticker.Stop()
dist := simulatedDist(ticker, clock)
dist := simulatedDist(ticker, clk)
printDist(dist)
require.True(t, 350 < dist.Count)
require.True(t, 9 < dist.Mean() && dist.Mean() < 11)
}
func TestAlignedTickerDistributionWithOffset(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
interval := 10 * time.Second
jitter := 5 * time.Second
offset := 3 * time.Second
clk := clock.NewMock()
since := clk.Now()
ticker := &AlignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
minInterval: interval / 100,
}
ticker.start(since, clk)
defer ticker.Stop()
dist := simulatedDist(ticker, clk)
printDist(dist)
require.True(t, 350 < dist.Count)
require.True(t, 9 < dist.Mean() && dist.Mean() < 11)
@ -188,12 +292,42 @@ func TestUnalignedTickerDistribution(t *testing.T) {
interval := 10 * time.Second
jitter := 5 * time.Second
offset := 0 * time.Second
clock := clock.NewMock()
clk := clock.NewMock()
ticker := newUnalignedTicker(interval, jitter, clock)
ticker := &UnalignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
}
ticker.start(clk)
defer ticker.Stop()
dist := simulatedDist(ticker, clock)
dist := simulatedDist(ticker, clk)
printDist(dist)
require.True(t, 350 < dist.Count)
require.True(t, 9 < dist.Mean() && dist.Mean() < 11)
}
func TestUnalignedTickerDistributionWithOffset(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
interval := 10 * time.Second
jitter := 5 * time.Second
offset := 3 * time.Second
clk := clock.NewMock()
ticker := &UnalignedTicker{
interval: interval,
jitter: jitter,
offset: offset,
}
ticker.start(clk)
defer ticker.Stop()
dist := simulatedDist(ticker, clk)
printDist(dist)
require.True(t, 350 < dist.Count)
require.True(t, 9 < dist.Mean() && dist.Mean() < 11)
@ -209,11 +343,15 @@ func TestRollingTickerDistribution(t *testing.T) {
interval := 10 * time.Second
jitter := 5 * time.Second
clock := clock.NewMock()
clk := clock.NewMock()
ticker := newRollingTicker(interval, jitter, clock)
ticker := &RollingTicker{
interval: interval,
jitter: jitter,
}
ticker.start(clk)
defer ticker.Stop()
dist := simulatedDist(ticker, clock)
dist := simulatedDist(ticker, clk)
printDist(dist)
require.True(t, 275 < dist.Count)
require.True(t, 12 < dist.Mean() && 13 > dist.Mean())
@ -237,14 +375,14 @@ func printDist(dist Distribution) {
fmt.Printf("Count: %d\n", dist.Count)
}
func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution {
since := clock.Now()
func simulatedDist(ticker Ticker, clk *clock.Mock) Distribution {
since := clk.Now()
until := since.Add(1 * time.Hour)
var dist Distribution
last := clock.Now()
for !clock.Now().After(until) {
last := clk.Now()
for !clk.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
dist.Buckets[tm.Second()]++
@ -252,7 +390,7 @@ func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution {
dist.Waittime += tm.Sub(last).Seconds()
last = tm
default:
clock.Add(1 * time.Second)
clk.Add(1 * time.Second)
}
}

View File

@ -153,6 +153,11 @@ type AgentConfig struct {
// same time, which can have a measurable effect on the system.
CollectionJitter Duration
// CollectionOffset is used to shift the collection by the given amount.
// This can be be used to avoid many plugins querying constraint devices
// at the same time by manually scheduling them in time.
CollectionOffset Duration
// FlushInterval is the Interval at which to flush data
FlushInterval Duration
@ -322,6 +327,7 @@ var globalTagsConfig = `
# user = "$USER"
`
var agentConfig = `
# Configuration for telegraf agent
[agent]
@ -347,6 +353,11 @@ var agentConfig = `
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Collection offset is used to shift the collection by the given amount.
## This can be be used to avoid many plugins querying constraint devices
## at the same time by manually scheduling them in time.
# collection_offset = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
@ -1488,6 +1499,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
c.getFieldDuration(tbl, "interval", &cp.Interval)
c.getFieldDuration(tbl, "precision", &cp.Precision)
c.getFieldDuration(tbl, "collection_jitter", &cp.CollectionJitter)
c.getFieldDuration(tbl, "collection_offset", &cp.CollectionOffset)
c.getFieldString(tbl, "name_prefix", &cp.MeasurementPrefix)
c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &cp.NameOverride)
@ -1793,6 +1805,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"collection_offset",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",

View File

@ -188,6 +188,11 @@ The agent table configures Telegraf and the defaults used across all plugins.
This can be used to avoid many plugins querying things like sysfs at the
same time, which can have a measurable effect on the system.
- **collection_offset**:
Collection offset is used to shift the collection by the given [interval][].
This can be be used to avoid many plugins querying constraint devices
at the same time by manually scheduling them in time.
- **flush_interval**:
Default flushing [interval][] for all outputs. Maximum flush_interval will be
flush_interval + flush_jitter.
@ -281,6 +286,11 @@ Parameters that can be used with any input plugin:
plugin. Collection jitter is used to jitter the collection by a random
[interval][].
- **collection_offset**:
Overrides the `collection_offset` setting of the [agent][Agent] for the
plugin. Collection offset is used to shift the collection by the given
[interval][].
- **name_override**: Override the base name of the measurement. (Default is
the name of the input).

View File

@ -60,6 +60,7 @@ type InputConfig struct {
Alias string
Interval time.Duration
CollectionJitter time.Duration
CollectionOffset time.Duration
Precision time.Duration
NameOverride string