2018-11-06 05:34:28 +08:00
|
|
|
package models
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
|
"github.com/influxdata/telegraf/selfstat"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{})
|
|
|
|
|
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Buffer stores metrics in a circular buffer.
|
|
|
|
|
type Buffer struct {
|
|
|
|
|
sync.Mutex
|
|
|
|
|
buf []telegraf.Metric
|
|
|
|
|
first int // index of the first/oldest metric
|
|
|
|
|
last int // one after the index of the last/newest metric
|
|
|
|
|
size int // number of metrics currently in the buffer
|
|
|
|
|
cap int // the capacity of the buffer
|
|
|
|
|
|
|
|
|
|
batchFirst int // index of the first metric in the batch
|
2019-01-16 03:48:52 +08:00
|
|
|
batchSize int // number of metrics currently in the batch
|
2018-11-06 05:34:28 +08:00
|
|
|
|
|
|
|
|
MetricsAdded selfstat.Stat
|
|
|
|
|
MetricsWritten selfstat.Stat
|
|
|
|
|
MetricsDropped selfstat.Stat
|
2019-01-23 05:43:51 +08:00
|
|
|
BufferSize selfstat.Stat
|
|
|
|
|
BufferLimit selfstat.Stat
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewBuffer returns a new empty Buffer with the given capacity.
|
2019-08-22 09:02:51 +08:00
|
|
|
func NewBuffer(name string, alias string, capacity int) *Buffer {
|
2020-03-11 06:19:32 +08:00
|
|
|
tags := map[string]string{"output": name}
|
|
|
|
|
if alias != "" {
|
|
|
|
|
tags["alias"] = alias
|
|
|
|
|
}
|
|
|
|
|
|
2018-11-06 05:34:28 +08:00
|
|
|
b := &Buffer{
|
|
|
|
|
buf: make([]telegraf.Metric, capacity),
|
|
|
|
|
first: 0,
|
|
|
|
|
last: 0,
|
|
|
|
|
size: 0,
|
|
|
|
|
cap: capacity,
|
|
|
|
|
|
|
|
|
|
MetricsAdded: selfstat.Register(
|
|
|
|
|
"write",
|
|
|
|
|
"metrics_added",
|
2020-03-11 06:19:32 +08:00
|
|
|
tags,
|
2018-11-06 05:34:28 +08:00
|
|
|
),
|
|
|
|
|
MetricsWritten: selfstat.Register(
|
|
|
|
|
"write",
|
|
|
|
|
"metrics_written",
|
2020-03-11 06:19:32 +08:00
|
|
|
tags,
|
2018-11-06 05:34:28 +08:00
|
|
|
),
|
|
|
|
|
MetricsDropped: selfstat.Register(
|
|
|
|
|
"write",
|
|
|
|
|
"metrics_dropped",
|
2020-03-11 06:19:32 +08:00
|
|
|
tags,
|
2018-11-06 05:34:28 +08:00
|
|
|
),
|
2019-01-23 05:43:51 +08:00
|
|
|
BufferSize: selfstat.Register(
|
|
|
|
|
"write",
|
|
|
|
|
"buffer_size",
|
2020-03-11 06:19:32 +08:00
|
|
|
tags,
|
2019-01-23 05:43:51 +08:00
|
|
|
),
|
|
|
|
|
BufferLimit: selfstat.Register(
|
|
|
|
|
"write",
|
|
|
|
|
"buffer_limit",
|
2020-03-11 06:19:32 +08:00
|
|
|
tags,
|
2019-01-23 05:43:51 +08:00
|
|
|
),
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
2019-01-23 05:43:51 +08:00
|
|
|
b.BufferSize.Set(int64(0))
|
|
|
|
|
b.BufferLimit.Set(int64(capacity))
|
2018-11-06 05:34:28 +08:00
|
|
|
return b
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Len returns the number of metrics currently in the buffer.
|
|
|
|
|
func (b *Buffer) Len() int {
|
|
|
|
|
b.Lock()
|
|
|
|
|
defer b.Unlock()
|
|
|
|
|
|
2019-01-23 05:43:51 +08:00
|
|
|
return b.length()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *Buffer) length() int {
|
|
|
|
|
return min(b.size+b.batchSize, b.cap)
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *Buffer) metricAdded() {
|
|
|
|
|
b.MetricsAdded.Incr(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *Buffer) metricWritten(metric telegraf.Metric) {
|
|
|
|
|
AgentMetricsWritten.Incr(1)
|
|
|
|
|
b.MetricsWritten.Incr(1)
|
|
|
|
|
metric.Accept()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *Buffer) metricDropped(metric telegraf.Metric) {
|
|
|
|
|
AgentMetricsDropped.Incr(1)
|
|
|
|
|
b.MetricsDropped.Incr(1)
|
|
|
|
|
metric.Reject()
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-13 03:19:47 +08:00
|
|
|
func (b *Buffer) addMetric(m telegraf.Metric) int {
|
2019-06-06 03:34:45 +08:00
|
|
|
dropped := 0
|
2018-11-06 05:34:28 +08:00
|
|
|
// Check if Buffer is full
|
|
|
|
|
if b.size == b.cap {
|
2019-01-16 03:48:52 +08:00
|
|
|
b.metricDropped(b.buf[b.last])
|
2019-06-06 03:34:45 +08:00
|
|
|
dropped++
|
2019-01-16 03:48:52 +08:00
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
if b.batchSize > 0 {
|
2018-11-06 05:34:28 +08:00
|
|
|
b.batchSize--
|
2019-01-16 03:48:52 +08:00
|
|
|
b.batchFirst = b.next(b.batchFirst)
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.metricAdded()
|
|
|
|
|
|
|
|
|
|
b.buf[b.last] = m
|
2019-01-16 03:48:52 +08:00
|
|
|
b.last = b.next(b.last)
|
2018-11-06 05:34:28 +08:00
|
|
|
|
|
|
|
|
if b.size == b.cap {
|
2019-01-16 03:48:52 +08:00
|
|
|
b.first = b.next(b.first)
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.size = min(b.size+1, b.cap)
|
2019-06-06 03:34:45 +08:00
|
|
|
return dropped
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
|
2019-06-06 03:34:45 +08:00
|
|
|
// Add adds metrics to the buffer and returns number of dropped metrics.
|
|
|
|
|
func (b *Buffer) Add(metrics ...telegraf.Metric) int {
|
2018-11-06 05:34:28 +08:00
|
|
|
b.Lock()
|
|
|
|
|
defer b.Unlock()
|
|
|
|
|
|
2019-06-06 03:34:45 +08:00
|
|
|
dropped := 0
|
2018-11-06 05:34:28 +08:00
|
|
|
for i := range metrics {
|
2022-10-13 03:19:47 +08:00
|
|
|
if n := b.addMetric(metrics[i]); n != 0 {
|
2019-06-06 03:34:45 +08:00
|
|
|
dropped += n
|
|
|
|
|
}
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
2019-01-23 05:43:51 +08:00
|
|
|
|
|
|
|
|
b.BufferSize.Set(int64(b.length()))
|
2019-06-06 03:34:45 +08:00
|
|
|
return dropped
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
// Batch returns a slice containing up to batchSize of the oldest metrics not
|
|
|
|
|
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
|
2019-01-16 03:48:52 +08:00
|
|
|
// batch must not be modified by the client.
|
2018-11-06 05:34:28 +08:00
|
|
|
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
|
|
|
|
|
b.Lock()
|
|
|
|
|
defer b.Unlock()
|
|
|
|
|
|
|
|
|
|
outLen := min(b.size, batchSize)
|
|
|
|
|
out := make([]telegraf.Metric, outLen)
|
|
|
|
|
if outLen == 0 {
|
|
|
|
|
return out
|
|
|
|
|
}
|
|
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
b.batchFirst = b.first
|
2018-11-06 05:34:28 +08:00
|
|
|
b.batchSize = outLen
|
|
|
|
|
|
2019-01-16 03:48:52 +08:00
|
|
|
batchIndex := b.batchFirst
|
|
|
|
|
for i := range out {
|
2020-07-15 02:32:54 +08:00
|
|
|
out[i] = b.buf[batchIndex]
|
2019-01-16 03:48:52 +08:00
|
|
|
b.buf[batchIndex] = nil
|
|
|
|
|
batchIndex = b.next(batchIndex)
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
2019-01-16 03:48:52 +08:00
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
b.first = b.nextby(b.first, b.batchSize)
|
2019-01-16 03:48:52 +08:00
|
|
|
b.size -= outLen
|
2018-11-06 05:34:28 +08:00
|
|
|
return out
|
|
|
|
|
}
|
|
|
|
|
|
2019-01-16 03:48:52 +08:00
|
|
|
// Accept marks the batch, acquired from Batch(), as successfully written.
|
2018-11-06 05:34:28 +08:00
|
|
|
func (b *Buffer) Accept(batch []telegraf.Metric) {
|
|
|
|
|
b.Lock()
|
|
|
|
|
defer b.Unlock()
|
|
|
|
|
|
|
|
|
|
for _, m := range batch {
|
|
|
|
|
b.metricWritten(m)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.resetBatch()
|
2019-01-23 05:43:51 +08:00
|
|
|
b.BufferSize.Set(int64(b.length()))
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
|
2019-01-16 03:48:52 +08:00
|
|
|
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
|
|
|
|
|
// as unsent.
|
2018-11-06 05:34:28 +08:00
|
|
|
func (b *Buffer) Reject(batch []telegraf.Metric) {
|
|
|
|
|
b.Lock()
|
|
|
|
|
defer b.Unlock()
|
|
|
|
|
|
2019-03-05 04:36:19 +08:00
|
|
|
if len(batch) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2019-01-16 03:48:52 +08:00
|
|
|
free := b.cap - b.size
|
2020-07-15 02:32:54 +08:00
|
|
|
restore := min(len(batch), free)
|
|
|
|
|
skip := len(batch) - restore
|
2019-01-16 03:48:52 +08:00
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
b.first = b.prevby(b.first, restore)
|
|
|
|
|
b.size = min(b.size+restore, b.cap)
|
2019-03-05 04:36:19 +08:00
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
re := b.first
|
2019-01-16 03:48:52 +08:00
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
// Copy metrics from the batch back into the buffer
|
2019-01-16 03:48:52 +08:00
|
|
|
for i := range batch {
|
2020-07-15 02:32:54 +08:00
|
|
|
if i < skip {
|
2019-01-16 03:48:52 +08:00
|
|
|
b.metricDropped(batch[i])
|
2020-07-15 02:32:54 +08:00
|
|
|
} else {
|
|
|
|
|
b.buf[re] = batch[i]
|
|
|
|
|
re = b.next(re)
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.resetBatch()
|
2019-01-23 05:43:51 +08:00
|
|
|
b.BufferSize.Set(int64(b.length()))
|
2018-11-06 05:34:28 +08:00
|
|
|
}
|
|
|
|
|
|
2019-01-16 03:48:52 +08:00
|
|
|
// next returns the next index with wrapping.
|
|
|
|
|
func (b *Buffer) next(index int) int {
|
|
|
|
|
index++
|
|
|
|
|
if index == b.cap {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
return index
|
|
|
|
|
}
|
|
|
|
|
|
2022-01-05 23:20:10 +08:00
|
|
|
// nextby returns the index that is count newer with wrapping.
|
2019-01-16 03:48:52 +08:00
|
|
|
func (b *Buffer) nextby(index, count int) int {
|
|
|
|
|
index += count
|
|
|
|
|
index %= b.cap
|
|
|
|
|
return index
|
|
|
|
|
}
|
|
|
|
|
|
2020-07-15 02:32:54 +08:00
|
|
|
// prevby returns the index that is count older with wrapping.
|
|
|
|
|
func (b *Buffer) prevby(index, count int) int {
|
|
|
|
|
index -= count
|
|
|
|
|
for index < 0 {
|
|
|
|
|
index += b.cap
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
index %= b.cap
|
|
|
|
|
return index
|
|
|
|
|
}
|
|
|
|
|
|
2018-11-06 05:34:28 +08:00
|
|
|
func (b *Buffer) resetBatch() {
|
|
|
|
|
b.batchFirst = 0
|
|
|
|
|
b.batchSize = 0
|
|
|
|
|
}
|