chore: resolve linter issues for confusing-naming and confusing-results (#11956)
This commit is contained in:
parent
0087a5d245
commit
85b7590ff9
|
|
@ -37,7 +37,7 @@ func (ac *accumulator) AddFields(
|
|||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
ac.addFields(measurement, tags, fields, telegraf.Untyped, t...)
|
||||
ac.addMeasurement(measurement, tags, fields, telegraf.Untyped, t...)
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddGauge(
|
||||
|
|
@ -46,7 +46,7 @@ func (ac *accumulator) AddGauge(
|
|||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
ac.addFields(measurement, tags, fields, telegraf.Gauge, t...)
|
||||
ac.addMeasurement(measurement, tags, fields, telegraf.Gauge, t...)
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddCounter(
|
||||
|
|
@ -55,7 +55,7 @@ func (ac *accumulator) AddCounter(
|
|||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
ac.addFields(measurement, tags, fields, telegraf.Counter, t...)
|
||||
ac.addMeasurement(measurement, tags, fields, telegraf.Counter, t...)
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddSummary(
|
||||
|
|
@ -64,7 +64,7 @@ func (ac *accumulator) AddSummary(
|
|||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
ac.addFields(measurement, tags, fields, telegraf.Summary, t...)
|
||||
ac.addMeasurement(measurement, tags, fields, telegraf.Summary, t...)
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddHistogram(
|
||||
|
|
@ -73,7 +73,7 @@ func (ac *accumulator) AddHistogram(
|
|||
tags map[string]string,
|
||||
t ...time.Time,
|
||||
) {
|
||||
ac.addFields(measurement, tags, fields, telegraf.Histogram, t...)
|
||||
ac.addMeasurement(measurement, tags, fields, telegraf.Histogram, t...)
|
||||
}
|
||||
|
||||
func (ac *accumulator) AddMetric(m telegraf.Metric) {
|
||||
|
|
@ -83,7 +83,7 @@ func (ac *accumulator) AddMetric(m telegraf.Metric) {
|
|||
}
|
||||
}
|
||||
|
||||
func (ac *accumulator) addFields(
|
||||
func (ac *accumulator) addMeasurement(
|
||||
measurement string,
|
||||
tags map[string]string,
|
||||
fields map[string]interface{},
|
||||
|
|
|
|||
106
agent/agent.go
106
agent/agent.go
|
|
@ -58,7 +58,7 @@ type processorUnit struct {
|
|||
}
|
||||
|
||||
// aggregatorUnit is a group of Aggregators and their source and sink channels.
|
||||
// Typically the aggregators write to a processor channel and pass the original
|
||||
// Typically, the aggregators write to a processor channel and pass the original
|
||||
// metrics to the output channel. The sink channels may be the same channel.
|
||||
|
||||
// ┌────────────┐
|
||||
|
|
@ -281,46 +281,7 @@ func (a *Agent) runInputs(
|
|||
) {
|
||||
var wg sync.WaitGroup
|
||||
for _, input := range unit.inputs {
|
||||
// Overwrite agent interval if this plugin has its own.
|
||||
interval := time.Duration(a.Config.Agent.Interval)
|
||||
if input.Config.Interval != 0 {
|
||||
interval = input.Config.Interval
|
||||
}
|
||||
|
||||
// Overwrite agent precision if this plugin has its own.
|
||||
precision := time.Duration(a.Config.Agent.Precision)
|
||||
if input.Config.Precision != 0 {
|
||||
precision = input.Config.Precision
|
||||
}
|
||||
|
||||
// Overwrite agent collection_jitter if this plugin has its own.
|
||||
jitter := time.Duration(a.Config.Agent.CollectionJitter)
|
||||
if input.Config.CollectionJitter != 0 {
|
||||
jitter = input.Config.CollectionJitter
|
||||
}
|
||||
|
||||
// Overwrite agent collection_offset if this plugin has its own.
|
||||
offset := time.Duration(a.Config.Agent.CollectionOffset)
|
||||
if input.Config.CollectionOffset != 0 {
|
||||
offset = input.Config.CollectionOffset
|
||||
}
|
||||
|
||||
var ticker Ticker
|
||||
if a.Config.Agent.RoundInterval {
|
||||
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
|
||||
} else {
|
||||
ticker = NewUnalignedTicker(interval, jitter, offset)
|
||||
}
|
||||
defer ticker.Stop()
|
||||
|
||||
acc := NewAccumulator(input, unit.dst)
|
||||
acc.SetPrecision(getPrecision(precision, interval))
|
||||
|
||||
wg.Add(1)
|
||||
go func(input *models.RunningInput) {
|
||||
defer wg.Done()
|
||||
a.gatherLoop(ctx, acc, input, ticker, interval)
|
||||
}(input)
|
||||
a.runInput(ctx, startTime, unit, input, &wg)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
|
@ -332,6 +293,49 @@ func (a *Agent) runInputs(
|
|||
log.Printf("D! [agent] Input channel closed")
|
||||
}
|
||||
|
||||
func (a *Agent) runInput(ctx context.Context, startTime time.Time, unit *inputUnit, input *models.RunningInput, wg *sync.WaitGroup) {
|
||||
// Overwrite agent interval if this plugin has its own.
|
||||
interval := time.Duration(a.Config.Agent.Interval)
|
||||
if input.Config.Interval != 0 {
|
||||
interval = input.Config.Interval
|
||||
}
|
||||
|
||||
// Overwrite agent precision if this plugin has its own.
|
||||
precision := time.Duration(a.Config.Agent.Precision)
|
||||
if input.Config.Precision != 0 {
|
||||
precision = input.Config.Precision
|
||||
}
|
||||
|
||||
// Overwrite agent collection_jitter if this plugin has its own.
|
||||
jitter := time.Duration(a.Config.Agent.CollectionJitter)
|
||||
if input.Config.CollectionJitter != 0 {
|
||||
jitter = input.Config.CollectionJitter
|
||||
}
|
||||
|
||||
// Overwrite agent collection_offset if this plugin has its own.
|
||||
offset := time.Duration(a.Config.Agent.CollectionOffset)
|
||||
if input.Config.CollectionOffset != 0 {
|
||||
offset = input.Config.CollectionOffset
|
||||
}
|
||||
|
||||
var ticker Ticker
|
||||
if a.Config.Agent.RoundInterval {
|
||||
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
|
||||
} else {
|
||||
ticker = NewUnalignedTicker(interval, jitter, offset)
|
||||
}
|
||||
defer ticker.Stop()
|
||||
|
||||
acc := NewAccumulator(input, unit.dst)
|
||||
acc.SetPrecision(getPrecision(precision, interval))
|
||||
|
||||
wg.Add(1)
|
||||
go func(input *models.RunningInput) {
|
||||
defer wg.Done()
|
||||
a.gatherLoop(ctx, acc, input, ticker, interval)
|
||||
}(input)
|
||||
}
|
||||
|
||||
// testStartInputs is a variation of startInputs for use in --test and --once
|
||||
// mode. It differs by logging Start errors and returning only plugins
|
||||
// successfully started.
|
||||
|
|
@ -377,6 +381,7 @@ func (a *Agent) testRunInputs(
|
|||
|
||||
nul := make(chan telegraf.Metric)
|
||||
go func() {
|
||||
//nolint:revive // empty block needed here
|
||||
for range nul {
|
||||
}
|
||||
}()
|
||||
|
|
@ -721,7 +726,7 @@ func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput)
|
|||
|
||||
err = output.Output.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error connecting to output %q: %w", output.LogName(), err)
|
||||
return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err)
|
||||
}
|
||||
}
|
||||
log.Printf("D! [agent] Successfully connected to %s", output.LogName())
|
||||
|
|
@ -882,7 +887,7 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
|
|||
}
|
||||
}()
|
||||
|
||||
err := a.test(ctx, wait, src)
|
||||
err := a.runTest(ctx, wait, src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -895,10 +900,10 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Test runs the agent and performs a single gather sending output to the
|
||||
// outputF. After gathering pauses for the wait duration to allow service
|
||||
// runTest runs the agent and performs a single gather sending output to the
|
||||
// outputC. After gathering pauses for the wait duration to allow service
|
||||
// inputs to run.
|
||||
func (a *Agent) test(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error {
|
||||
func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error {
|
||||
log.Printf("D! [agent] Initializing plugins")
|
||||
err := a.initPlugins()
|
||||
if err != nil {
|
||||
|
|
@ -971,7 +976,7 @@ func (a *Agent) test(ctx context.Context, wait time.Duration, outputC chan<- tel
|
|||
|
||||
// Once runs the full agent for a single gather.
|
||||
func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
|
||||
err := a.once(ctx, wait)
|
||||
err := a.runOnce(ctx, wait)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -990,10 +995,10 @@ func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// On runs the agent and performs a single gather sending output to the
|
||||
// outputF. After gathering pauses for the wait duration to allow service
|
||||
// runOnce runs the agent and performs a single gather sending output to the
|
||||
// outputC. After gathering pauses for the wait duration to allow service
|
||||
// inputs to run.
|
||||
func (a *Agent) once(ctx context.Context, wait time.Duration) error {
|
||||
func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
|
||||
log.Printf("D! [agent] Initializing plugins")
|
||||
err := a.initPlugins()
|
||||
if err != nil {
|
||||
|
|
@ -1094,6 +1099,7 @@ func getPrecision(precision, interval time.Duration) time.Duration {
|
|||
|
||||
// panicRecover displays an error if an input panics.
|
||||
func panicRecover(input *models.RunningInput) {
|
||||
//nolint:revive // recover is called inside a deferred function
|
||||
if err := recover(); err != nil {
|
||||
trace := make([]byte, 2048)
|
||||
runtime.Stack(trace, true)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/influxdata/toml"
|
||||
"github.com/influxdata/toml/ast"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
|
|
@ -31,13 +33,11 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
"github.com/influxdata/toml"
|
||||
"github.com/influxdata/toml/ast"
|
||||
)
|
||||
|
||||
var (
|
||||
// envVarRe is a regex to find environment variables in the config file
|
||||
envVarRe = regexp.MustCompile(`\$\{(\w+)\}|\$(\w+)`)
|
||||
envVarRe = regexp.MustCompile(`\${(\w+)}|\$(\w+)`)
|
||||
|
||||
envVarEscaper = strings.NewReplacer(
|
||||
`"`, `\"`,
|
||||
|
|
@ -134,7 +134,7 @@ type AgentConfig struct {
|
|||
// specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
|
||||
// Valid time units are "ns", "us" (or "µs"), "ms", "s".
|
||||
//
|
||||
// By default or when set to "0s", precision will be set to the same
|
||||
// By default, or when set to "0s", precision will be set to the same
|
||||
// timestamp order as the collection interval, with the maximum being 1s:
|
||||
// ie, when interval = "10s", precision will be "1s"
|
||||
// when interval = "250ms", precision will be "1ms"
|
||||
|
|
@ -150,7 +150,7 @@ type AgentConfig struct {
|
|||
CollectionJitter Duration
|
||||
|
||||
// CollectionOffset is used to shift the collection by the given amount.
|
||||
// This can be be used to avoid many plugins querying constraint devices
|
||||
// This can be used to avoid many plugins querying constraint devices
|
||||
// at the same time by manually scheduling them in time.
|
||||
CollectionOffset Duration
|
||||
|
||||
|
|
@ -163,7 +163,7 @@ type AgentConfig struct {
|
|||
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
|
||||
FlushJitter Duration
|
||||
|
||||
// MetricBatchSize is the maximum number of metrics that is wrote to an
|
||||
// MetricBatchSize is the maximum number of metrics that is written to an
|
||||
// output plugin in one call.
|
||||
MetricBatchSize int
|
||||
|
||||
|
|
@ -357,7 +357,7 @@ func getDefaultConfigPath() (string, error) {
|
|||
}
|
||||
|
||||
// if we got here, we didn't find a file in a default location
|
||||
return "", fmt.Errorf("No config file specified, and could not find one"+
|
||||
return "", fmt.Errorf("no config file specified, and could not find one"+
|
||||
" in $TELEGRAF_CONFIG_PATH, %s, or %s", homefile, etcfile)
|
||||
}
|
||||
|
||||
|
|
@ -377,11 +377,11 @@ func (c *Config) LoadConfig(path string) error {
|
|||
}
|
||||
data, err := LoadConfigFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error loading config file %s: %w", path, err)
|
||||
return fmt.Errorf("error loading config file %s: %w", path, err)
|
||||
}
|
||||
|
||||
if err = c.LoadConfigData(data); err != nil {
|
||||
return fmt.Errorf("Error loading config file %s: %w", path, err)
|
||||
return fmt.Errorf("error loading config file %s: %w", path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -390,7 +390,7 @@ func (c *Config) LoadConfig(path string) error {
|
|||
func (c *Config) LoadConfigData(data []byte) error {
|
||||
tbl, err := parseConfig(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error parsing data: %s", err)
|
||||
return fmt.Errorf("error parsing data: %s", err)
|
||||
}
|
||||
|
||||
// Parse tags tables first:
|
||||
|
|
@ -485,7 +485,7 @@ func (c *Config) LoadConfigData(data []byte) error {
|
|||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("Unsupported config format: %s",
|
||||
return fmt.Errorf("unsupported config format: %s",
|
||||
pluginName)
|
||||
}
|
||||
if len(c.UnusedFields) > 0 {
|
||||
|
|
@ -502,7 +502,7 @@ func (c *Config) LoadConfigData(data []byte) error {
|
|||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("Unsupported config format: %s",
|
||||
return fmt.Errorf("unsupported config format: %s",
|
||||
pluginName)
|
||||
}
|
||||
if len(c.UnusedFields) > 0 {
|
||||
|
|
@ -515,22 +515,22 @@ func (c *Config) LoadConfigData(data []byte) error {
|
|||
case []*ast.Table:
|
||||
for _, t := range pluginSubTable {
|
||||
if err = c.addAggregator(pluginName, t); err != nil {
|
||||
return fmt.Errorf("Error parsing %s, %s", pluginName, err)
|
||||
return fmt.Errorf("error parsing %s, %s", pluginName, err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("Unsupported config format: %s",
|
||||
return fmt.Errorf("unsupported config format: %s",
|
||||
pluginName)
|
||||
}
|
||||
if len(c.UnusedFields) > 0 {
|
||||
return fmt.Errorf("plugin %s.%s: line %d: configuration specified the fields %q, but they weren't used", name, pluginName, subTable.Line, keys(c.UnusedFields))
|
||||
}
|
||||
}
|
||||
// Assume it's an input input for legacy config file support if no other
|
||||
// Assume it's an input for legacy config file support if no other
|
||||
// identifiers are present
|
||||
default:
|
||||
if err = c.addInput(name, subTable); err != nil {
|
||||
return fmt.Errorf("Error parsing %s, %s", name, err)
|
||||
return fmt.Errorf("error parsing %s, %s", name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -668,7 +668,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
|
|||
printHistoricPluginDeprecationNotice("aggregators", name, di)
|
||||
return fmt.Errorf("plugin deprecated")
|
||||
}
|
||||
return fmt.Errorf("Undefined but requested aggregator: %s", name)
|
||||
return fmt.Errorf("undefined but requested aggregator: %s", name)
|
||||
}
|
||||
aggregator := creator()
|
||||
|
||||
|
|
@ -717,7 +717,7 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
|
|||
|
||||
creator, ok := parsers.Parsers[dataformat]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat)
|
||||
return nil, fmt.Errorf("undefined but requested parser: %s", dataformat)
|
||||
}
|
||||
parser := creator(parentname)
|
||||
|
||||
|
|
@ -743,7 +743,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
|
|||
printHistoricPluginDeprecationNotice("processors", name, di)
|
||||
return fmt.Errorf("plugin deprecated")
|
||||
}
|
||||
return fmt.Errorf("Undefined but requested processor: %s", name)
|
||||
return fmt.Errorf("undefined but requested processor: %s", name)
|
||||
}
|
||||
streamingProcessor := creator()
|
||||
|
||||
|
|
@ -788,7 +788,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
|
|||
})
|
||||
}
|
||||
|
||||
// Setup the processor
|
||||
// Set up the processor
|
||||
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -895,7 +895,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
|
|||
return fmt.Errorf("plugin deprecated")
|
||||
}
|
||||
|
||||
return fmt.Errorf("Undefined but requested input: %s", name)
|
||||
return fmt.Errorf("undefined but requested input: %s", name)
|
||||
}
|
||||
input := creator()
|
||||
|
||||
|
|
@ -1067,8 +1067,8 @@ func (c *Config) buildFilter(tbl *ast.Table) (models.Filter, error) {
|
|||
c.getFieldStringSlice(tbl, "drop", &f.FieldDrop)
|
||||
c.getFieldStringSlice(tbl, "fielddrop", &f.FieldDrop)
|
||||
|
||||
c.getFieldTagFilter(tbl, "tagpass", &f.TagPass)
|
||||
c.getFieldTagFilter(tbl, "tagdrop", &f.TagDrop)
|
||||
c.getFieldTagFilter(tbl, "tagpass", &f.TagPassFilters)
|
||||
c.getFieldTagFilter(tbl, "tagdrop", &f.TagDropFilters)
|
||||
|
||||
c.getFieldStringSlice(tbl, "tagexclude", &f.TagExclude)
|
||||
c.getFieldStringSlice(tbl, "taginclude", &f.TagInclude)
|
||||
|
|
@ -1172,7 +1172,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
|
|||
}
|
||||
|
||||
// buildOutput parses output specific items from the ast.Table,
|
||||
// builds the filter and returns an
|
||||
// builds the filter and returns a
|
||||
// models.OutputConfig to be inserted into models.RunningInput
|
||||
// Note: error exists in the return for future calls that might require error
|
||||
func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
|
||||
|
|
@ -1370,7 +1370,7 @@ func (c *Config) getFieldTagFilter(tbl *ast.Table, fieldName string, target *[]m
|
|||
tagFilter := models.TagFilter{Name: name}
|
||||
for _, elem := range ary.Value {
|
||||
if str, ok := elem.(*ast.String); ok {
|
||||
tagFilter.Filter = append(tagFilter.Filter, str.Value)
|
||||
tagFilter.Values = append(tagFilter.Values, str.Value)
|
||||
}
|
||||
}
|
||||
*target = append(*target, tagFilter)
|
||||
|
|
|
|||
|
|
@ -70,16 +70,16 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
|
|||
NamePass: []string{"metricname1", "ip_192.168.1.1_name"},
|
||||
FieldDrop: []string{"other", "stuff"},
|
||||
FieldPass: []string{"some", "strings"},
|
||||
TagDrop: []models.TagFilter{
|
||||
TagDropFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "badtag",
|
||||
Filter: []string{"othertag"},
|
||||
Values: []string{"othertag"},
|
||||
},
|
||||
},
|
||||
TagPass: []models.TagFilter{
|
||||
TagPassFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "goodtag",
|
||||
Filter: []string{"mytag"},
|
||||
Values: []string{"mytag"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -110,16 +110,16 @@ func TestConfig_LoadSingleInput(t *testing.T) {
|
|||
NamePass: []string{"metricname1"},
|
||||
FieldDrop: []string{"other", "stuff"},
|
||||
FieldPass: []string{"some", "strings"},
|
||||
TagDrop: []models.TagFilter{
|
||||
TagDropFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "badtag",
|
||||
Filter: []string{"othertag"},
|
||||
Values: []string{"othertag"},
|
||||
},
|
||||
},
|
||||
TagPass: []models.TagFilter{
|
||||
TagPassFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "goodtag",
|
||||
Filter: []string{"mytag"},
|
||||
Values: []string{"mytag"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -155,16 +155,16 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
|||
NamePass: []string{"metricname1"},
|
||||
FieldDrop: []string{"other", "stuff"},
|
||||
FieldPass: []string{"some", "strings"},
|
||||
TagDrop: []models.TagFilter{
|
||||
TagDropFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "badtag",
|
||||
Filter: []string{"othertag"},
|
||||
Values: []string{"othertag"},
|
||||
},
|
||||
},
|
||||
TagPass: []models.TagFilter{
|
||||
TagPassFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "goodtag",
|
||||
Filter: []string{"mytag"},
|
||||
Values: []string{"mytag"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -199,16 +199,16 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
|||
NamePass: []string{"metricname1"},
|
||||
FieldDrop: []string{"other", "stuff"},
|
||||
FieldPass: []string{"some", "strings"},
|
||||
TagDrop: []models.TagFilter{
|
||||
TagDropFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "badtag",
|
||||
Filter: []string{"othertag"},
|
||||
Values: []string{"othertag"},
|
||||
},
|
||||
},
|
||||
TagPass: []models.TagFilter{
|
||||
TagPassFilters: []models.TagFilter{
|
||||
{
|
||||
Name: "goodtag",
|
||||
Filter: []string{"mytag"},
|
||||
Values: []string{"mytag"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -288,19 +288,19 @@ func TestConfig_FieldNotDefined(t *testing.T) {
|
|||
c := NewConfig()
|
||||
err := c.LoadConfig("./testdata/invalid_field.toml")
|
||||
require.Error(t, err, "invalid field name")
|
||||
require.Equal(t, "Error loading config file ./testdata/invalid_field.toml: plugin inputs.http_listener_v2: line 1: configuration specified the fields [\"not_a_field\"], but they weren't used", err.Error())
|
||||
require.Equal(t, "error loading config file ./testdata/invalid_field.toml: plugin inputs.http_listener_v2: line 1: configuration specified the fields [\"not_a_field\"], but they weren't used", err.Error())
|
||||
}
|
||||
|
||||
func TestConfig_WrongFieldType(t *testing.T) {
|
||||
c := NewConfig()
|
||||
err := c.LoadConfig("./testdata/wrong_field_type.toml")
|
||||
require.Error(t, err, "invalid field type")
|
||||
require.Equal(t, "Error loading config file ./testdata/wrong_field_type.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Port) cannot unmarshal TOML string into int", err.Error())
|
||||
require.Equal(t, "error loading config file ./testdata/wrong_field_type.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Port) cannot unmarshal TOML string into int", err.Error())
|
||||
|
||||
c = NewConfig()
|
||||
err = c.LoadConfig("./testdata/wrong_field_type2.toml")
|
||||
require.Error(t, err, "invalid field type2")
|
||||
require.Equal(t, "Error loading config file ./testdata/wrong_field_type2.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Methods) cannot unmarshal TOML string into []string", err.Error())
|
||||
require.Equal(t, "error loading config file ./testdata/wrong_field_type2.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Methods) cannot unmarshal TOML string into []string", err.Error())
|
||||
}
|
||||
|
||||
func TestConfig_InlineTables(t *testing.T) {
|
||||
|
|
@ -333,7 +333,7 @@ func TestConfig_BadOrdering(t *testing.T) {
|
|||
c := NewConfig()
|
||||
err := c.LoadConfig("./testdata/non_slice_slice.toml")
|
||||
require.Error(t, err, "bad ordering")
|
||||
require.Equal(t, "Error loading config file ./testdata/non_slice_slice.toml: error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)", err.Error())
|
||||
require.Equal(t, "error loading config file ./testdata/non_slice_slice.toml: error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)", err.Error())
|
||||
}
|
||||
|
||||
func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) {
|
||||
|
|
@ -359,7 +359,7 @@ func TestConfig_URLRetries3Fails(t *testing.T) {
|
|||
}))
|
||||
defer ts.Close()
|
||||
|
||||
expected := fmt.Sprintf("Error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL)
|
||||
expected := fmt.Sprintf("error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL)
|
||||
|
||||
c := NewConfig()
|
||||
err := c.LoadConfig(ts.URL)
|
||||
|
|
@ -408,10 +408,10 @@ func TestConfig_URLLikeFileName(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
|
||||
if runtime.GOOS == "windows" {
|
||||
// The error file not found error message is different on windows
|
||||
require.Equal(t, "Error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: The system cannot find the file specified.", err.Error())
|
||||
// The error file not found error message is different on Windows
|
||||
require.Equal(t, "error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: The system cannot find the file specified.", err.Error())
|
||||
} else {
|
||||
require.Equal(t, "Error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: no such file or directory", err.Error())
|
||||
require.Equal(t, "error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: no such file or directory", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -786,7 +786,7 @@ type MockupInputPluginParserOld struct {
|
|||
func (m *MockupInputPluginParserOld) SampleConfig() string {
|
||||
return "Mockup old parser test plugin"
|
||||
}
|
||||
func (m *MockupInputPluginParserOld) Gather(acc telegraf.Accumulator) error {
|
||||
func (m *MockupInputPluginParserOld) Gather(_ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupInputPluginParserOld) SetParser(parser parsers.Parser) {
|
||||
|
|
@ -805,7 +805,7 @@ type MockupInputPluginParserNew struct {
|
|||
func (m *MockupInputPluginParserNew) SampleConfig() string {
|
||||
return "Mockup old parser test plugin"
|
||||
}
|
||||
func (m *MockupInputPluginParserNew) Gather(acc telegraf.Accumulator) error {
|
||||
func (m *MockupInputPluginParserNew) Gather(_ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupInputPluginParserNew) SetParser(parser telegraf.Parser) {
|
||||
|
|
@ -836,7 +836,7 @@ type MockupInputPlugin struct {
|
|||
func (m *MockupInputPlugin) SampleConfig() string {
|
||||
return "Mockup test input plugin"
|
||||
}
|
||||
func (m *MockupInputPlugin) Gather(acc telegraf.Accumulator) error {
|
||||
func (m *MockupInputPlugin) Gather(_ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupInputPlugin) SetParser(parser telegraf.Parser) {
|
||||
|
|
@ -849,7 +849,7 @@ type MockupProcessorPluginParser struct {
|
|||
ParserFunc telegraf.ParserFunc
|
||||
}
|
||||
|
||||
func (m *MockupProcessorPluginParser) Start(acc telegraf.Accumulator) error {
|
||||
func (m *MockupProcessorPluginParser) Start(_ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupProcessorPluginParser) Stop() error {
|
||||
|
|
@ -858,10 +858,10 @@ func (m *MockupProcessorPluginParser) Stop() error {
|
|||
func (m *MockupProcessorPluginParser) SampleConfig() string {
|
||||
return "Mockup test processor plugin with parser"
|
||||
}
|
||||
func (m *MockupProcessorPluginParser) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||
func (m *MockupProcessorPluginParser) Apply(_ ...telegraf.Metric) []telegraf.Metric {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupProcessorPluginParser) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
|
||||
func (m *MockupProcessorPluginParser) Add(_ telegraf.Metric, _ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupProcessorPluginParser) SetParser(parser telegraf.Parser) {
|
||||
|
|
@ -890,7 +890,7 @@ func (m *MockupOuputPlugin) Close() error {
|
|||
func (m *MockupOuputPlugin) SampleConfig() string {
|
||||
return "Mockup test output plugin"
|
||||
}
|
||||
func (m *MockupOuputPlugin) Write(metrics []telegraf.Metric) error {
|
||||
func (m *MockupOuputPlugin) Write(_ []telegraf.Metric) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,10 +2,9 @@ package docker
|
|||
|
||||
import "strings"
|
||||
|
||||
// Adapts some of the logic from the actual Docker library's image parsing
|
||||
// routines:
|
||||
// ParseImage Adapts some of the logic from the actual Docker library's image parsing routines:
|
||||
// https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go
|
||||
func ParseImage(image string) (string, string) {
|
||||
func ParseImage(image string) (imageName string, imageVersion string) {
|
||||
domain := ""
|
||||
remainder := ""
|
||||
|
||||
|
|
@ -17,9 +16,8 @@ func ParseImage(image string) (string, string) {
|
|||
domain, remainder = image[:i], image[i+1:]
|
||||
}
|
||||
|
||||
imageName := ""
|
||||
imageVersion := "unknown"
|
||||
|
||||
imageName = ""
|
||||
imageVersion = "unknown"
|
||||
i = strings.LastIndex(remainder, ":")
|
||||
if i > -1 {
|
||||
imageVersion = remainder[i+1:]
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ func (b *Buffer) metricDropped(metric telegraf.Metric) {
|
|||
metric.Reject()
|
||||
}
|
||||
|
||||
func (b *Buffer) add(m telegraf.Metric) int {
|
||||
func (b *Buffer) addMetric(m telegraf.Metric) int {
|
||||
dropped := 0
|
||||
// Check if Buffer is full
|
||||
if b.size == b.cap {
|
||||
|
|
@ -137,7 +137,7 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) int {
|
|||
|
||||
dropped := 0
|
||||
for i := range metrics {
|
||||
if n := b.add(metrics[i]); n != 0 {
|
||||
if n := b.addMetric(metrics[i]); n != 0 {
|
||||
dropped += n
|
||||
}
|
||||
}
|
||||
|
|
|
|||
100
models/filter.go
100
models/filter.go
|
|
@ -10,29 +10,29 @@ import (
|
|||
// TagFilter is the name of a tag, and the values on which to filter
|
||||
type TagFilter struct {
|
||||
Name string
|
||||
Filter []string
|
||||
Values []string
|
||||
filter filter.Filter
|
||||
}
|
||||
|
||||
// Filter containing drop/pass and tagdrop/tagpass rules
|
||||
type Filter struct {
|
||||
NameDrop []string
|
||||
nameDrop filter.Filter
|
||||
NamePass []string
|
||||
namePass filter.Filter
|
||||
NameDrop []string
|
||||
nameDropFilter filter.Filter
|
||||
NamePass []string
|
||||
namePassFilter filter.Filter
|
||||
|
||||
FieldDrop []string
|
||||
fieldDrop filter.Filter
|
||||
FieldPass []string
|
||||
fieldPass filter.Filter
|
||||
FieldDrop []string
|
||||
fieldDropFilter filter.Filter
|
||||
FieldPass []string
|
||||
fieldPassFilter filter.Filter
|
||||
|
||||
TagDrop []TagFilter
|
||||
TagPass []TagFilter
|
||||
TagDropFilters []TagFilter
|
||||
TagPassFilters []TagFilter
|
||||
|
||||
TagExclude []string
|
||||
tagExclude filter.Filter
|
||||
TagInclude []string
|
||||
tagInclude filter.Filter
|
||||
TagExclude []string
|
||||
tagExcludeFilter filter.Filter
|
||||
TagInclude []string
|
||||
tagIncludeFilter filter.Filter
|
||||
|
||||
isActive bool
|
||||
}
|
||||
|
|
@ -45,48 +45,48 @@ func (f *Filter) Compile() error {
|
|||
len(f.FieldPass) == 0 &&
|
||||
len(f.TagInclude) == 0 &&
|
||||
len(f.TagExclude) == 0 &&
|
||||
len(f.TagPass) == 0 &&
|
||||
len(f.TagDrop) == 0 {
|
||||
len(f.TagPassFilters) == 0 &&
|
||||
len(f.TagDropFilters) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
f.isActive = true
|
||||
var err error
|
||||
f.nameDrop, err = filter.Compile(f.NameDrop)
|
||||
f.nameDropFilter, err = filter.Compile(f.NameDrop)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'namedrop', %s", err)
|
||||
}
|
||||
f.namePass, err = filter.Compile(f.NamePass)
|
||||
f.namePassFilter, err = filter.Compile(f.NamePass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'namepass', %s", err)
|
||||
}
|
||||
|
||||
f.fieldDrop, err = filter.Compile(f.FieldDrop)
|
||||
f.fieldDropFilter, err = filter.Compile(f.FieldDrop)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'fielddrop', %s", err)
|
||||
}
|
||||
f.fieldPass, err = filter.Compile(f.FieldPass)
|
||||
f.fieldPassFilter, err = filter.Compile(f.FieldPass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'fieldpass', %s", err)
|
||||
}
|
||||
|
||||
f.tagExclude, err = filter.Compile(f.TagExclude)
|
||||
f.tagExcludeFilter, err = filter.Compile(f.TagExclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'tagexclude', %s", err)
|
||||
}
|
||||
f.tagInclude, err = filter.Compile(f.TagInclude)
|
||||
f.tagIncludeFilter, err = filter.Compile(f.TagInclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'taginclude', %s", err)
|
||||
}
|
||||
|
||||
for i := range f.TagDrop {
|
||||
f.TagDrop[i].filter, err = filter.Compile(f.TagDrop[i].Filter)
|
||||
for i := range f.TagDropFilters {
|
||||
f.TagDropFilters[i].filter, err = filter.Compile(f.TagDropFilters[i].Values)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'tagdrop', %s", err)
|
||||
}
|
||||
}
|
||||
for i := range f.TagPass {
|
||||
f.TagPass[i].filter, err = filter.Compile(f.TagPass[i].Filter)
|
||||
for i := range f.TagPassFilters {
|
||||
f.TagPassFilters[i].filter, err = filter.Compile(f.TagPassFilters[i].Values)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'tagpass', %s", err)
|
||||
}
|
||||
|
|
@ -128,46 +128,46 @@ func (f *Filter) IsActive() bool {
|
|||
return f.isActive
|
||||
}
|
||||
|
||||
// shouldNamePass returns true if the metric should pass, false if should drop
|
||||
// shouldNamePass returns true if the metric should pass, false if it should drop
|
||||
// based on the drop/pass filter parameters
|
||||
func (f *Filter) shouldNamePass(key string) bool {
|
||||
pass := func(f *Filter) bool {
|
||||
return f.namePass.Match(key)
|
||||
return f.namePassFilter.Match(key)
|
||||
}
|
||||
|
||||
drop := func(f *Filter) bool {
|
||||
return !f.nameDrop.Match(key)
|
||||
return !f.nameDropFilter.Match(key)
|
||||
}
|
||||
|
||||
if f.namePass != nil && f.nameDrop != nil {
|
||||
if f.namePassFilter != nil && f.nameDropFilter != nil {
|
||||
return pass(f) && drop(f)
|
||||
} else if f.namePass != nil {
|
||||
} else if f.namePassFilter != nil {
|
||||
return pass(f)
|
||||
} else if f.nameDrop != nil {
|
||||
} else if f.nameDropFilter != nil {
|
||||
return drop(f)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// shouldFieldPass returns true if the metric should pass, false if should drop
|
||||
// shouldFieldPass returns true if the metric should pass, false if it should drop
|
||||
// based on the drop/pass filter parameters
|
||||
func (f *Filter) shouldFieldPass(key string) bool {
|
||||
if f.fieldPass != nil && f.fieldDrop != nil {
|
||||
return f.fieldPass.Match(key) && !f.fieldDrop.Match(key)
|
||||
} else if f.fieldPass != nil {
|
||||
return f.fieldPass.Match(key)
|
||||
} else if f.fieldDrop != nil {
|
||||
return !f.fieldDrop.Match(key)
|
||||
if f.fieldPassFilter != nil && f.fieldDropFilter != nil {
|
||||
return f.fieldPassFilter.Match(key) && !f.fieldDropFilter.Match(key)
|
||||
} else if f.fieldPassFilter != nil {
|
||||
return f.fieldPassFilter.Match(key)
|
||||
} else if f.fieldDropFilter != nil {
|
||||
return !f.fieldDropFilter.Match(key)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// shouldTagsPass returns true if the metric should pass, false if should drop
|
||||
// shouldTagsPass returns true if the metric should pass, false if it should drop
|
||||
// based on the tagdrop/tagpass filter parameters
|
||||
func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool {
|
||||
pass := func(f *Filter) bool {
|
||||
for _, pat := range f.TagPass {
|
||||
for _, pat := range f.TagPassFilters {
|
||||
if pat.filter == nil {
|
||||
continue
|
||||
}
|
||||
|
|
@ -183,7 +183,7 @@ func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool {
|
|||
}
|
||||
|
||||
drop := func(f *Filter) bool {
|
||||
for _, pat := range f.TagDrop {
|
||||
for _, pat := range f.TagDropFilters {
|
||||
if pat.filter == nil {
|
||||
continue
|
||||
}
|
||||
|
|
@ -200,13 +200,13 @@ func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool {
|
|||
|
||||
// Add additional logic in case where both parameters are set.
|
||||
// see: https://github.com/influxdata/telegraf/issues/2860
|
||||
if f.TagPass != nil && f.TagDrop != nil {
|
||||
if f.TagPassFilters != nil && f.TagDropFilters != nil {
|
||||
// return true only in case when tag pass and won't be dropped (true, true).
|
||||
// in case when the same tag should be passed and dropped it will be dropped (true, false).
|
||||
return pass(f) && drop(f)
|
||||
} else if f.TagPass != nil {
|
||||
} else if f.TagPassFilters != nil {
|
||||
return pass(f)
|
||||
} else if f.TagDrop != nil {
|
||||
} else if f.TagDropFilters != nil {
|
||||
return drop(f)
|
||||
}
|
||||
|
||||
|
|
@ -230,9 +230,9 @@ func (f *Filter) filterFields(metric telegraf.Metric) {
|
|||
// filterTags removes tags according to taginclude/tagexclude.
|
||||
func (f *Filter) filterTags(metric telegraf.Metric) {
|
||||
filterKeys := []string{}
|
||||
if f.tagInclude != nil {
|
||||
if f.tagIncludeFilter != nil {
|
||||
for _, tag := range metric.TagList() {
|
||||
if !f.tagInclude.Match(tag.Key) {
|
||||
if !f.tagIncludeFilter.Match(tag.Key) {
|
||||
filterKeys = append(filterKeys, tag.Key)
|
||||
}
|
||||
}
|
||||
|
|
@ -241,9 +241,9 @@ func (f *Filter) filterTags(metric telegraf.Metric) {
|
|||
metric.RemoveTag(key)
|
||||
}
|
||||
|
||||
if f.tagExclude != nil {
|
||||
if f.tagExcludeFilter != nil {
|
||||
for _, tag := range metric.TagList() {
|
||||
if f.tagExclude.Match(tag.Key) {
|
||||
if f.tagExcludeFilter.Match(tag.Key) {
|
||||
filterKeys = append(filterKeys, tag.Key)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,10 +4,11 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFilter_ApplyEmpty(t *testing.T) {
|
||||
|
|
@ -26,11 +27,11 @@ func TestFilter_ApplyTagsDontPass(t *testing.T) {
|
|||
filters := []TagFilter{
|
||||
{
|
||||
Name: "cpu",
|
||||
Filter: []string{"cpu-*"},
|
||||
Values: []string{"cpu-*"},
|
||||
},
|
||||
}
|
||||
f := Filter{
|
||||
TagDrop: filters,
|
||||
TagDropFilters: filters,
|
||||
}
|
||||
require.NoError(t, f.Compile())
|
||||
require.NoError(t, f.Compile())
|
||||
|
|
@ -243,14 +244,14 @@ func TestFilter_TagPass(t *testing.T) {
|
|||
filters := []TagFilter{
|
||||
{
|
||||
Name: "cpu",
|
||||
Filter: []string{"cpu-*"},
|
||||
Values: []string{"cpu-*"},
|
||||
},
|
||||
{
|
||||
Name: "mem",
|
||||
Filter: []string{"mem_free"},
|
||||
Values: []string{"mem_free"},
|
||||
}}
|
||||
f := Filter{
|
||||
TagPass: filters,
|
||||
TagPassFilters: filters,
|
||||
}
|
||||
require.NoError(t, f.Compile())
|
||||
|
||||
|
|
@ -287,14 +288,14 @@ func TestFilter_TagDrop(t *testing.T) {
|
|||
filters := []TagFilter{
|
||||
{
|
||||
Name: "cpu",
|
||||
Filter: []string{"cpu-*"},
|
||||
Values: []string{"cpu-*"},
|
||||
},
|
||||
{
|
||||
Name: "mem",
|
||||
Filter: []string{"mem_free"},
|
||||
Values: []string{"mem_free"},
|
||||
}}
|
||||
f := Filter{
|
||||
TagDrop: filters,
|
||||
TagDropFilters: filters,
|
||||
}
|
||||
require.NoError(t, f.Compile())
|
||||
|
||||
|
|
@ -445,24 +446,24 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) {
|
|||
filterPass := []TagFilter{
|
||||
{
|
||||
Name: "tag1",
|
||||
Filter: []string{"1", "4"},
|
||||
Values: []string{"1", "4"},
|
||||
},
|
||||
}
|
||||
|
||||
filterDrop := []TagFilter{
|
||||
{
|
||||
Name: "tag1",
|
||||
Filter: []string{"4"},
|
||||
Values: []string{"4"},
|
||||
},
|
||||
{
|
||||
Name: "tag2",
|
||||
Filter: []string{"3"},
|
||||
Values: []string{"3"},
|
||||
},
|
||||
}
|
||||
|
||||
f := Filter{
|
||||
TagDrop: filterDrop,
|
||||
TagPass: filterPass,
|
||||
TagDropFilters: filterDrop,
|
||||
TagPassFilters: filterPass,
|
||||
}
|
||||
|
||||
require.NoError(t, f.Compile())
|
||||
|
|
|
|||
|
|
@ -163,15 +163,11 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) {
|
|||
until := r.periodEnd.Add(r.Config.Period)
|
||||
r.UpdateWindow(since, until)
|
||||
|
||||
r.push(acc)
|
||||
r.Aggregator.Reset()
|
||||
}
|
||||
|
||||
func (r *RunningAggregator) push(acc telegraf.Accumulator) {
|
||||
start := time.Now()
|
||||
r.Aggregator.Push(acc)
|
||||
elapsed := time.Since(start)
|
||||
r.PushTime.Incr(elapsed.Nanoseconds())
|
||||
r.Aggregator.Reset()
|
||||
}
|
||||
|
||||
func (r *RunningAggregator) Log() telegraf.Logger {
|
||||
|
|
|
|||
|
|
@ -198,7 +198,7 @@ func (r *RunningOutput) Write() error {
|
|||
break
|
||||
}
|
||||
|
||||
err := r.write(batch)
|
||||
err := r.writeMetrics(batch)
|
||||
if err != nil {
|
||||
r.buffer.Reject(batch)
|
||||
return err
|
||||
|
|
@ -215,7 +215,7 @@ func (r *RunningOutput) WriteBatch() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
err := r.write(batch)
|
||||
err := r.writeMetrics(batch)
|
||||
if err != nil {
|
||||
r.buffer.Reject(batch)
|
||||
return err
|
||||
|
|
@ -233,7 +233,7 @@ func (r *RunningOutput) Close() {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *RunningOutput) write(metrics []telegraf.Metric) error {
|
||||
func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error {
|
||||
dropped := atomic.LoadInt64(&r.droppedMetrics)
|
||||
if dropped > 0 {
|
||||
r.log.Warnf("Metric buffer overflow; %d metrics have been dropped", dropped)
|
||||
|
|
|
|||
|
|
@ -7,13 +7,13 @@ import (
|
|||
"golang.org/x/text/encoding/unicode"
|
||||
)
|
||||
|
||||
// NewDecoder returns a x/text Decoder for the specified text encoding. The
|
||||
// NewDecoder returns an x/text Decoder for the specified text encoding. The
|
||||
// Decoder converts a character encoding into utf-8 bytes. If a BOM is found
|
||||
// it will be converted into a utf-8 BOM, you can use
|
||||
// it will be converted into an utf-8 BOM, you can use
|
||||
// github.com/dimchansky/utfbom to strip the BOM.
|
||||
//
|
||||
// The "none" or "" encoding will pass through bytes unchecked. Use the utf-8
|
||||
// encoding if you want invalid bytes replaced using the the unicode
|
||||
// encoding if you want invalid bytes replaced using the unicode
|
||||
// replacement character.
|
||||
//
|
||||
// Detection of utf-16 endianness using the BOM is not currently provided due
|
||||
|
|
@ -22,13 +22,13 @@ import (
|
|||
func NewDecoder(enc string) (*Decoder, error) {
|
||||
switch enc {
|
||||
case "utf-8":
|
||||
return &Decoder{Transformer: unicode.UTF8.NewDecoder()}, nil
|
||||
return createDecoder(unicode.UTF8.NewDecoder()), nil
|
||||
case "utf-16le":
|
||||
return newDecoder(unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder()), nil
|
||||
return createDecoder(unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder()), nil
|
||||
case "utf-16be":
|
||||
return newDecoder(unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder()), nil
|
||||
return createDecoder(unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder()), nil
|
||||
case "none", "":
|
||||
return newDecoder(encoding.Nop.NewDecoder()), nil
|
||||
return createDecoder(encoding.Nop.NewDecoder()), nil
|
||||
}
|
||||
return nil, errors.New("unknown character encoding")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import (
|
|||
// Other than resetting r.err and r.transformComplete in Read() this
|
||||
// was copied from x/text
|
||||
|
||||
func newDecoder(t transform.Transformer) *Decoder {
|
||||
func createDecoder(t transform.Transformer) *Decoder {
|
||||
return &Decoder{Transformer: t}
|
||||
}
|
||||
|
||||
|
|
@ -73,7 +73,7 @@ type Reader struct {
|
|||
src0, src1 int
|
||||
|
||||
// transformComplete is whether the transformation is complete,
|
||||
// regardless of whether or not it was successful.
|
||||
// regardless of whether it was successful.
|
||||
transformComplete bool
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,8 +9,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -78,7 +79,7 @@ func (a *Accumulator) ClearMetrics() {
|
|||
a.Metrics = make([]*Metric, 0)
|
||||
}
|
||||
|
||||
func (a *Accumulator) addFields(
|
||||
func (a *Accumulator) addMeasurement(
|
||||
measurement string,
|
||||
tags map[string]string,
|
||||
fields map[string]interface{},
|
||||
|
|
@ -113,7 +114,6 @@ func (a *Accumulator) addFields(
|
|||
if len(timestamp) > 0 {
|
||||
t = timestamp[0]
|
||||
} else {
|
||||
t = time.Now()
|
||||
if a.TimeFunc == nil {
|
||||
t = time.Now()
|
||||
} else {
|
||||
|
|
@ -147,7 +147,7 @@ func (a *Accumulator) AddFields(
|
|||
tags map[string]string,
|
||||
timestamp ...time.Time,
|
||||
) {
|
||||
a.addFields(measurement, tags, fields, telegraf.Untyped, timestamp...)
|
||||
a.addMeasurement(measurement, tags, fields, telegraf.Untyped, timestamp...)
|
||||
}
|
||||
|
||||
func (a *Accumulator) AddCounter(
|
||||
|
|
@ -156,7 +156,7 @@ func (a *Accumulator) AddCounter(
|
|||
tags map[string]string,
|
||||
timestamp ...time.Time,
|
||||
) {
|
||||
a.addFields(measurement, tags, fields, telegraf.Counter, timestamp...)
|
||||
a.addMeasurement(measurement, tags, fields, telegraf.Counter, timestamp...)
|
||||
}
|
||||
|
||||
func (a *Accumulator) AddGauge(
|
||||
|
|
@ -165,12 +165,12 @@ func (a *Accumulator) AddGauge(
|
|||
tags map[string]string,
|
||||
timestamp ...time.Time,
|
||||
) {
|
||||
a.addFields(measurement, tags, fields, telegraf.Gauge, timestamp...)
|
||||
a.addMeasurement(measurement, tags, fields, telegraf.Gauge, timestamp...)
|
||||
}
|
||||
|
||||
func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
|
||||
for _, m := range metrics {
|
||||
a.addFields(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
|
||||
a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -180,7 +180,7 @@ func (a *Accumulator) AddSummary(
|
|||
tags map[string]string,
|
||||
timestamp ...time.Time,
|
||||
) {
|
||||
a.addFields(measurement, tags, fields, telegraf.Summary, timestamp...)
|
||||
a.addMeasurement(measurement, tags, fields, telegraf.Summary, timestamp...)
|
||||
}
|
||||
|
||||
func (a *Accumulator) AddHistogram(
|
||||
|
|
@ -189,11 +189,11 @@ func (a *Accumulator) AddHistogram(
|
|||
tags map[string]string,
|
||||
timestamp ...time.Time,
|
||||
) {
|
||||
a.addFields(measurement, tags, fields, telegraf.Histogram, timestamp...)
|
||||
a.addMeasurement(measurement, tags, fields, telegraf.Histogram, timestamp...)
|
||||
}
|
||||
|
||||
func (a *Accumulator) AddMetric(m telegraf.Metric) {
|
||||
a.addFields(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
|
||||
a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
|
||||
}
|
||||
|
||||
func (a *Accumulator) WithTracking(_ int) telegraf.TrackingAccumulator {
|
||||
|
|
@ -296,7 +296,7 @@ func (a *Accumulator) TagValue(measurement string, key string) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// Calls the given Gather function and returns the first error found.
|
||||
// GatherError calls the given Gather function and returns the first error found.
|
||||
func (a *Accumulator) GatherError(gf func(telegraf.Accumulator) error) error {
|
||||
if err := gf(a); err != nil {
|
||||
return err
|
||||
|
|
@ -529,7 +529,7 @@ func (a *Accumulator) HasInt32Field(measurement string, field string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// HasStringField returns true if the measurement has an String value
|
||||
// HasStringField returns true if the measurement has a String value
|
||||
func (a *Accumulator) HasStringField(measurement string, field string) bool {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
|
|
@ -704,14 +704,14 @@ func (a *Accumulator) StringField(measurement string, field string) (string, boo
|
|||
}
|
||||
|
||||
// BoolField returns the bool value of the given measurement and field or false.
|
||||
func (a *Accumulator) BoolField(measurement string, field string) (bool, bool) {
|
||||
func (a *Accumulator) BoolField(measurement string, field string) (v bool, ok bool) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
for _, p := range a.Metrics {
|
||||
if p.Measurement == measurement {
|
||||
for fieldname, value := range p.Fields {
|
||||
if fieldname == field {
|
||||
v, ok := value.(bool)
|
||||
v, ok = value.(bool)
|
||||
return v, ok
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue