diff --git a/agent/accumulator.go b/agent/accumulator.go index 7e3ae9cee..9105c1761 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -37,7 +37,7 @@ func (ac *accumulator) AddFields( tags map[string]string, t ...time.Time, ) { - ac.addFields(measurement, tags, fields, telegraf.Untyped, t...) + ac.addMeasurement(measurement, tags, fields, telegraf.Untyped, t...) } func (ac *accumulator) AddGauge( @@ -46,7 +46,7 @@ func (ac *accumulator) AddGauge( tags map[string]string, t ...time.Time, ) { - ac.addFields(measurement, tags, fields, telegraf.Gauge, t...) + ac.addMeasurement(measurement, tags, fields, telegraf.Gauge, t...) } func (ac *accumulator) AddCounter( @@ -55,7 +55,7 @@ func (ac *accumulator) AddCounter( tags map[string]string, t ...time.Time, ) { - ac.addFields(measurement, tags, fields, telegraf.Counter, t...) + ac.addMeasurement(measurement, tags, fields, telegraf.Counter, t...) } func (ac *accumulator) AddSummary( @@ -64,7 +64,7 @@ func (ac *accumulator) AddSummary( tags map[string]string, t ...time.Time, ) { - ac.addFields(measurement, tags, fields, telegraf.Summary, t...) + ac.addMeasurement(measurement, tags, fields, telegraf.Summary, t...) } func (ac *accumulator) AddHistogram( @@ -73,7 +73,7 @@ func (ac *accumulator) AddHistogram( tags map[string]string, t ...time.Time, ) { - ac.addFields(measurement, tags, fields, telegraf.Histogram, t...) + ac.addMeasurement(measurement, tags, fields, telegraf.Histogram, t...) } func (ac *accumulator) AddMetric(m telegraf.Metric) { @@ -83,7 +83,7 @@ func (ac *accumulator) AddMetric(m telegraf.Metric) { } } -func (ac *accumulator) addFields( +func (ac *accumulator) addMeasurement( measurement string, tags map[string]string, fields map[string]interface{}, diff --git a/agent/agent.go b/agent/agent.go index 4aedfc055..aca1d4656 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -58,7 +58,7 @@ type processorUnit struct { } // aggregatorUnit is a group of Aggregators and their source and sink channels. -// Typically the aggregators write to a processor channel and pass the original +// Typically, the aggregators write to a processor channel and pass the original // metrics to the output channel. The sink channels may be the same channel. // ┌────────────┐ @@ -281,46 +281,7 @@ func (a *Agent) runInputs( ) { var wg sync.WaitGroup for _, input := range unit.inputs { - // Overwrite agent interval if this plugin has its own. - interval := time.Duration(a.Config.Agent.Interval) - if input.Config.Interval != 0 { - interval = input.Config.Interval - } - - // Overwrite agent precision if this plugin has its own. - precision := time.Duration(a.Config.Agent.Precision) - if input.Config.Precision != 0 { - precision = input.Config.Precision - } - - // Overwrite agent collection_jitter if this plugin has its own. - jitter := time.Duration(a.Config.Agent.CollectionJitter) - if input.Config.CollectionJitter != 0 { - jitter = input.Config.CollectionJitter - } - - // Overwrite agent collection_offset if this plugin has its own. - offset := time.Duration(a.Config.Agent.CollectionOffset) - if input.Config.CollectionOffset != 0 { - offset = input.Config.CollectionOffset - } - - var ticker Ticker - if a.Config.Agent.RoundInterval { - ticker = NewAlignedTicker(startTime, interval, jitter, offset) - } else { - ticker = NewUnalignedTicker(interval, jitter, offset) - } - defer ticker.Stop() - - acc := NewAccumulator(input, unit.dst) - acc.SetPrecision(getPrecision(precision, interval)) - - wg.Add(1) - go func(input *models.RunningInput) { - defer wg.Done() - a.gatherLoop(ctx, acc, input, ticker, interval) - }(input) + a.runInput(ctx, startTime, unit, input, &wg) } wg.Wait() @@ -332,6 +293,49 @@ func (a *Agent) runInputs( log.Printf("D! [agent] Input channel closed") } +func (a *Agent) runInput(ctx context.Context, startTime time.Time, unit *inputUnit, input *models.RunningInput, wg *sync.WaitGroup) { + // Overwrite agent interval if this plugin has its own. + interval := time.Duration(a.Config.Agent.Interval) + if input.Config.Interval != 0 { + interval = input.Config.Interval + } + + // Overwrite agent precision if this plugin has its own. + precision := time.Duration(a.Config.Agent.Precision) + if input.Config.Precision != 0 { + precision = input.Config.Precision + } + + // Overwrite agent collection_jitter if this plugin has its own. + jitter := time.Duration(a.Config.Agent.CollectionJitter) + if input.Config.CollectionJitter != 0 { + jitter = input.Config.CollectionJitter + } + + // Overwrite agent collection_offset if this plugin has its own. + offset := time.Duration(a.Config.Agent.CollectionOffset) + if input.Config.CollectionOffset != 0 { + offset = input.Config.CollectionOffset + } + + var ticker Ticker + if a.Config.Agent.RoundInterval { + ticker = NewAlignedTicker(startTime, interval, jitter, offset) + } else { + ticker = NewUnalignedTicker(interval, jitter, offset) + } + defer ticker.Stop() + + acc := NewAccumulator(input, unit.dst) + acc.SetPrecision(getPrecision(precision, interval)) + + wg.Add(1) + go func(input *models.RunningInput) { + defer wg.Done() + a.gatherLoop(ctx, acc, input, ticker, interval) + }(input) +} + // testStartInputs is a variation of startInputs for use in --test and --once // mode. It differs by logging Start errors and returning only plugins // successfully started. @@ -377,6 +381,7 @@ func (a *Agent) testRunInputs( nul := make(chan telegraf.Metric) go func() { + //nolint:revive // empty block needed here for range nul { } }() @@ -721,7 +726,7 @@ func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput) err = output.Output.Connect() if err != nil { - return fmt.Errorf("Error connecting to output %q: %w", output.LogName(), err) + return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err) } } log.Printf("D! [agent] Successfully connected to %s", output.LogName()) @@ -882,7 +887,7 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error { } }() - err := a.test(ctx, wait, src) + err := a.runTest(ctx, wait, src) if err != nil { return err } @@ -895,10 +900,10 @@ func (a *Agent) Test(ctx context.Context, wait time.Duration) error { return nil } -// Test runs the agent and performs a single gather sending output to the -// outputF. After gathering pauses for the wait duration to allow service +// runTest runs the agent and performs a single gather sending output to the +// outputC. After gathering pauses for the wait duration to allow service // inputs to run. -func (a *Agent) test(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error { +func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error { log.Printf("D! [agent] Initializing plugins") err := a.initPlugins() if err != nil { @@ -971,7 +976,7 @@ func (a *Agent) test(ctx context.Context, wait time.Duration, outputC chan<- tel // Once runs the full agent for a single gather. func (a *Agent) Once(ctx context.Context, wait time.Duration) error { - err := a.once(ctx, wait) + err := a.runOnce(ctx, wait) if err != nil { return err } @@ -990,10 +995,10 @@ func (a *Agent) Once(ctx context.Context, wait time.Duration) error { return nil } -// On runs the agent and performs a single gather sending output to the -// outputF. After gathering pauses for the wait duration to allow service +// runOnce runs the agent and performs a single gather sending output to the +// outputC. After gathering pauses for the wait duration to allow service // inputs to run. -func (a *Agent) once(ctx context.Context, wait time.Duration) error { +func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error { log.Printf("D! [agent] Initializing plugins") err := a.initPlugins() if err != nil { @@ -1094,6 +1099,7 @@ func getPrecision(precision, interval time.Duration) time.Duration { // panicRecover displays an error if an input panics. func panicRecover(input *models.RunningInput) { + //nolint:revive // recover is called inside a deferred function if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) diff --git a/config/config.go b/config/config.go index 34f987ab2..36b65aa01 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,8 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -31,13 +33,11 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers" - "github.com/influxdata/toml" - "github.com/influxdata/toml/ast" ) var ( // envVarRe is a regex to find environment variables in the config file - envVarRe = regexp.MustCompile(`\$\{(\w+)\}|\$(\w+)`) + envVarRe = regexp.MustCompile(`\${(\w+)}|\$(\w+)`) envVarEscaper = strings.NewReplacer( `"`, `\"`, @@ -134,7 +134,7 @@ type AgentConfig struct { // specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s). // Valid time units are "ns", "us" (or "µs"), "ms", "s". // - // By default or when set to "0s", precision will be set to the same + // By default, or when set to "0s", precision will be set to the same // timestamp order as the collection interval, with the maximum being 1s: // ie, when interval = "10s", precision will be "1s" // when interval = "250ms", precision will be "1ms" @@ -150,7 +150,7 @@ type AgentConfig struct { CollectionJitter Duration // CollectionOffset is used to shift the collection by the given amount. - // This can be be used to avoid many plugins querying constraint devices + // This can be used to avoid many plugins querying constraint devices // at the same time by manually scheduling them in time. CollectionOffset Duration @@ -163,7 +163,7 @@ type AgentConfig struct { // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s FlushJitter Duration - // MetricBatchSize is the maximum number of metrics that is wrote to an + // MetricBatchSize is the maximum number of metrics that is written to an // output plugin in one call. MetricBatchSize int @@ -357,7 +357,7 @@ func getDefaultConfigPath() (string, error) { } // if we got here, we didn't find a file in a default location - return "", fmt.Errorf("No config file specified, and could not find one"+ + return "", fmt.Errorf("no config file specified, and could not find one"+ " in $TELEGRAF_CONFIG_PATH, %s, or %s", homefile, etcfile) } @@ -377,11 +377,11 @@ func (c *Config) LoadConfig(path string) error { } data, err := LoadConfigFile(path) if err != nil { - return fmt.Errorf("Error loading config file %s: %w", path, err) + return fmt.Errorf("error loading config file %s: %w", path, err) } if err = c.LoadConfigData(data); err != nil { - return fmt.Errorf("Error loading config file %s: %w", path, err) + return fmt.Errorf("error loading config file %s: %w", path, err) } return nil } @@ -390,7 +390,7 @@ func (c *Config) LoadConfig(path string) error { func (c *Config) LoadConfigData(data []byte) error { tbl, err := parseConfig(data) if err != nil { - return fmt.Errorf("Error parsing data: %s", err) + return fmt.Errorf("error parsing data: %s", err) } // Parse tags tables first: @@ -485,7 +485,7 @@ func (c *Config) LoadConfigData(data []byte) error { } } default: - return fmt.Errorf("Unsupported config format: %s", + return fmt.Errorf("unsupported config format: %s", pluginName) } if len(c.UnusedFields) > 0 { @@ -502,7 +502,7 @@ func (c *Config) LoadConfigData(data []byte) error { } } default: - return fmt.Errorf("Unsupported config format: %s", + return fmt.Errorf("unsupported config format: %s", pluginName) } if len(c.UnusedFields) > 0 { @@ -515,22 +515,22 @@ func (c *Config) LoadConfigData(data []byte) error { case []*ast.Table: for _, t := range pluginSubTable { if err = c.addAggregator(pluginName, t); err != nil { - return fmt.Errorf("Error parsing %s, %s", pluginName, err) + return fmt.Errorf("error parsing %s, %s", pluginName, err) } } default: - return fmt.Errorf("Unsupported config format: %s", + return fmt.Errorf("unsupported config format: %s", pluginName) } if len(c.UnusedFields) > 0 { return fmt.Errorf("plugin %s.%s: line %d: configuration specified the fields %q, but they weren't used", name, pluginName, subTable.Line, keys(c.UnusedFields)) } } - // Assume it's an input input for legacy config file support if no other + // Assume it's an input for legacy config file support if no other // identifiers are present default: if err = c.addInput(name, subTable); err != nil { - return fmt.Errorf("Error parsing %s, %s", name, err) + return fmt.Errorf("error parsing %s, %s", name, err) } } } @@ -668,7 +668,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error { printHistoricPluginDeprecationNotice("aggregators", name, di) return fmt.Errorf("plugin deprecated") } - return fmt.Errorf("Undefined but requested aggregator: %s", name) + return fmt.Errorf("undefined but requested aggregator: %s", name) } aggregator := creator() @@ -717,7 +717,7 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) creator, ok := parsers.Parsers[dataformat] if !ok { - return nil, fmt.Errorf("Undefined but requested parser: %s", dataformat) + return nil, fmt.Errorf("undefined but requested parser: %s", dataformat) } parser := creator(parentname) @@ -743,7 +743,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { printHistoricPluginDeprecationNotice("processors", name, di) return fmt.Errorf("plugin deprecated") } - return fmt.Errorf("Undefined but requested processor: %s", name) + return fmt.Errorf("undefined but requested processor: %s", name) } streamingProcessor := creator() @@ -788,7 +788,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { }) } - // Setup the processor + // Set up the processor if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil { return err } @@ -895,7 +895,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return fmt.Errorf("plugin deprecated") } - return fmt.Errorf("Undefined but requested input: %s", name) + return fmt.Errorf("undefined but requested input: %s", name) } input := creator() @@ -1067,8 +1067,8 @@ func (c *Config) buildFilter(tbl *ast.Table) (models.Filter, error) { c.getFieldStringSlice(tbl, "drop", &f.FieldDrop) c.getFieldStringSlice(tbl, "fielddrop", &f.FieldDrop) - c.getFieldTagFilter(tbl, "tagpass", &f.TagPass) - c.getFieldTagFilter(tbl, "tagdrop", &f.TagDrop) + c.getFieldTagFilter(tbl, "tagpass", &f.TagPassFilters) + c.getFieldTagFilter(tbl, "tagdrop", &f.TagDropFilters) c.getFieldStringSlice(tbl, "tagexclude", &f.TagExclude) c.getFieldStringSlice(tbl, "taginclude", &f.TagInclude) @@ -1172,7 +1172,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) } // buildOutput parses output specific items from the ast.Table, -// builds the filter and returns an +// builds the filter and returns a // models.OutputConfig to be inserted into models.RunningInput // Note: error exists in the return for future calls that might require error func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { @@ -1370,7 +1370,7 @@ func (c *Config) getFieldTagFilter(tbl *ast.Table, fieldName string, target *[]m tagFilter := models.TagFilter{Name: name} for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - tagFilter.Filter = append(tagFilter.Filter, str.Value) + tagFilter.Values = append(tagFilter.Values, str.Value) } } *target = append(*target, tagFilter) diff --git a/config/config_test.go b/config/config_test.go index 165af0de6..3b3592017 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -70,16 +70,16 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) { NamePass: []string{"metricname1", "ip_192.168.1.1_name"}, FieldDrop: []string{"other", "stuff"}, FieldPass: []string{"some", "strings"}, - TagDrop: []models.TagFilter{ + TagDropFilters: []models.TagFilter{ { Name: "badtag", - Filter: []string{"othertag"}, + Values: []string{"othertag"}, }, }, - TagPass: []models.TagFilter{ + TagPassFilters: []models.TagFilter{ { Name: "goodtag", - Filter: []string{"mytag"}, + Values: []string{"mytag"}, }, }, } @@ -110,16 +110,16 @@ func TestConfig_LoadSingleInput(t *testing.T) { NamePass: []string{"metricname1"}, FieldDrop: []string{"other", "stuff"}, FieldPass: []string{"some", "strings"}, - TagDrop: []models.TagFilter{ + TagDropFilters: []models.TagFilter{ { Name: "badtag", - Filter: []string{"othertag"}, + Values: []string{"othertag"}, }, }, - TagPass: []models.TagFilter{ + TagPassFilters: []models.TagFilter{ { Name: "goodtag", - Filter: []string{"mytag"}, + Values: []string{"mytag"}, }, }, } @@ -155,16 +155,16 @@ func TestConfig_LoadDirectory(t *testing.T) { NamePass: []string{"metricname1"}, FieldDrop: []string{"other", "stuff"}, FieldPass: []string{"some", "strings"}, - TagDrop: []models.TagFilter{ + TagDropFilters: []models.TagFilter{ { Name: "badtag", - Filter: []string{"othertag"}, + Values: []string{"othertag"}, }, }, - TagPass: []models.TagFilter{ + TagPassFilters: []models.TagFilter{ { Name: "goodtag", - Filter: []string{"mytag"}, + Values: []string{"mytag"}, }, }, } @@ -199,16 +199,16 @@ func TestConfig_LoadDirectory(t *testing.T) { NamePass: []string{"metricname1"}, FieldDrop: []string{"other", "stuff"}, FieldPass: []string{"some", "strings"}, - TagDrop: []models.TagFilter{ + TagDropFilters: []models.TagFilter{ { Name: "badtag", - Filter: []string{"othertag"}, + Values: []string{"othertag"}, }, }, - TagPass: []models.TagFilter{ + TagPassFilters: []models.TagFilter{ { Name: "goodtag", - Filter: []string{"mytag"}, + Values: []string{"mytag"}, }, }, } @@ -288,19 +288,19 @@ func TestConfig_FieldNotDefined(t *testing.T) { c := NewConfig() err := c.LoadConfig("./testdata/invalid_field.toml") require.Error(t, err, "invalid field name") - require.Equal(t, "Error loading config file ./testdata/invalid_field.toml: plugin inputs.http_listener_v2: line 1: configuration specified the fields [\"not_a_field\"], but they weren't used", err.Error()) + require.Equal(t, "error loading config file ./testdata/invalid_field.toml: plugin inputs.http_listener_v2: line 1: configuration specified the fields [\"not_a_field\"], but they weren't used", err.Error()) } func TestConfig_WrongFieldType(t *testing.T) { c := NewConfig() err := c.LoadConfig("./testdata/wrong_field_type.toml") require.Error(t, err, "invalid field type") - require.Equal(t, "Error loading config file ./testdata/wrong_field_type.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Port) cannot unmarshal TOML string into int", err.Error()) + require.Equal(t, "error loading config file ./testdata/wrong_field_type.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Port) cannot unmarshal TOML string into int", err.Error()) c = NewConfig() err = c.LoadConfig("./testdata/wrong_field_type2.toml") require.Error(t, err, "invalid field type2") - require.Equal(t, "Error loading config file ./testdata/wrong_field_type2.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Methods) cannot unmarshal TOML string into []string", err.Error()) + require.Equal(t, "error loading config file ./testdata/wrong_field_type2.toml: error parsing http_listener_v2, line 2: (config.MockupInputPlugin.Methods) cannot unmarshal TOML string into []string", err.Error()) } func TestConfig_InlineTables(t *testing.T) { @@ -333,7 +333,7 @@ func TestConfig_BadOrdering(t *testing.T) { c := NewConfig() err := c.LoadConfig("./testdata/non_slice_slice.toml") require.Error(t, err, "bad ordering") - require.Equal(t, "Error loading config file ./testdata/non_slice_slice.toml: error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)", err.Error()) + require.Equal(t, "error loading config file ./testdata/non_slice_slice.toml: error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)", err.Error()) } func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) { @@ -359,7 +359,7 @@ func TestConfig_URLRetries3Fails(t *testing.T) { })) defer ts.Close() - expected := fmt.Sprintf("Error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL) + expected := fmt.Sprintf("error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL) c := NewConfig() err := c.LoadConfig(ts.URL) @@ -408,10 +408,10 @@ func TestConfig_URLLikeFileName(t *testing.T) { require.Error(t, err) if runtime.GOOS == "windows" { - // The error file not found error message is different on windows - require.Equal(t, "Error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: The system cannot find the file specified.", err.Error()) + // The error file not found error message is different on Windows + require.Equal(t, "error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: The system cannot find the file specified.", err.Error()) } else { - require.Equal(t, "Error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: no such file or directory", err.Error()) + require.Equal(t, "error loading config file http:##www.example.com.conf: open http:##www.example.com.conf: no such file or directory", err.Error()) } } @@ -786,7 +786,7 @@ type MockupInputPluginParserOld struct { func (m *MockupInputPluginParserOld) SampleConfig() string { return "Mockup old parser test plugin" } -func (m *MockupInputPluginParserOld) Gather(acc telegraf.Accumulator) error { +func (m *MockupInputPluginParserOld) Gather(_ telegraf.Accumulator) error { return nil } func (m *MockupInputPluginParserOld) SetParser(parser parsers.Parser) { @@ -805,7 +805,7 @@ type MockupInputPluginParserNew struct { func (m *MockupInputPluginParserNew) SampleConfig() string { return "Mockup old parser test plugin" } -func (m *MockupInputPluginParserNew) Gather(acc telegraf.Accumulator) error { +func (m *MockupInputPluginParserNew) Gather(_ telegraf.Accumulator) error { return nil } func (m *MockupInputPluginParserNew) SetParser(parser telegraf.Parser) { @@ -836,7 +836,7 @@ type MockupInputPlugin struct { func (m *MockupInputPlugin) SampleConfig() string { return "Mockup test input plugin" } -func (m *MockupInputPlugin) Gather(acc telegraf.Accumulator) error { +func (m *MockupInputPlugin) Gather(_ telegraf.Accumulator) error { return nil } func (m *MockupInputPlugin) SetParser(parser telegraf.Parser) { @@ -849,7 +849,7 @@ type MockupProcessorPluginParser struct { ParserFunc telegraf.ParserFunc } -func (m *MockupProcessorPluginParser) Start(acc telegraf.Accumulator) error { +func (m *MockupProcessorPluginParser) Start(_ telegraf.Accumulator) error { return nil } func (m *MockupProcessorPluginParser) Stop() error { @@ -858,10 +858,10 @@ func (m *MockupProcessorPluginParser) Stop() error { func (m *MockupProcessorPluginParser) SampleConfig() string { return "Mockup test processor plugin with parser" } -func (m *MockupProcessorPluginParser) Apply(in ...telegraf.Metric) []telegraf.Metric { +func (m *MockupProcessorPluginParser) Apply(_ ...telegraf.Metric) []telegraf.Metric { return nil } -func (m *MockupProcessorPluginParser) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { +func (m *MockupProcessorPluginParser) Add(_ telegraf.Metric, _ telegraf.Accumulator) error { return nil } func (m *MockupProcessorPluginParser) SetParser(parser telegraf.Parser) { @@ -890,7 +890,7 @@ func (m *MockupOuputPlugin) Close() error { func (m *MockupOuputPlugin) SampleConfig() string { return "Mockup test output plugin" } -func (m *MockupOuputPlugin) Write(metrics []telegraf.Metric) error { +func (m *MockupOuputPlugin) Write(_ []telegraf.Metric) error { return nil } diff --git a/internal/docker/docker.go b/internal/docker/docker.go index 1808944ae..d682d7150 100644 --- a/internal/docker/docker.go +++ b/internal/docker/docker.go @@ -2,10 +2,9 @@ package docker import "strings" -// Adapts some of the logic from the actual Docker library's image parsing -// routines: +// ParseImage Adapts some of the logic from the actual Docker library's image parsing routines: // https://github.com/docker/distribution/blob/release/2.7/reference/normalize.go -func ParseImage(image string) (string, string) { +func ParseImage(image string) (imageName string, imageVersion string) { domain := "" remainder := "" @@ -17,9 +16,8 @@ func ParseImage(image string) (string, string) { domain, remainder = image[:i], image[i+1:] } - imageName := "" - imageVersion := "unknown" - + imageName = "" + imageVersion = "unknown" i = strings.LastIndex(remainder, ":") if i > -1 { imageVersion = remainder[i+1:] diff --git a/models/buffer.go b/models/buffer.go index 1e6ef10fd..ac559a86e 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -104,7 +104,7 @@ func (b *Buffer) metricDropped(metric telegraf.Metric) { metric.Reject() } -func (b *Buffer) add(m telegraf.Metric) int { +func (b *Buffer) addMetric(m telegraf.Metric) int { dropped := 0 // Check if Buffer is full if b.size == b.cap { @@ -137,7 +137,7 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) int { dropped := 0 for i := range metrics { - if n := b.add(metrics[i]); n != 0 { + if n := b.addMetric(metrics[i]); n != 0 { dropped += n } } diff --git a/models/filter.go b/models/filter.go index 8103c2317..b4d871236 100644 --- a/models/filter.go +++ b/models/filter.go @@ -10,29 +10,29 @@ import ( // TagFilter is the name of a tag, and the values on which to filter type TagFilter struct { Name string - Filter []string + Values []string filter filter.Filter } // Filter containing drop/pass and tagdrop/tagpass rules type Filter struct { - NameDrop []string - nameDrop filter.Filter - NamePass []string - namePass filter.Filter + NameDrop []string + nameDropFilter filter.Filter + NamePass []string + namePassFilter filter.Filter - FieldDrop []string - fieldDrop filter.Filter - FieldPass []string - fieldPass filter.Filter + FieldDrop []string + fieldDropFilter filter.Filter + FieldPass []string + fieldPassFilter filter.Filter - TagDrop []TagFilter - TagPass []TagFilter + TagDropFilters []TagFilter + TagPassFilters []TagFilter - TagExclude []string - tagExclude filter.Filter - TagInclude []string - tagInclude filter.Filter + TagExclude []string + tagExcludeFilter filter.Filter + TagInclude []string + tagIncludeFilter filter.Filter isActive bool } @@ -45,48 +45,48 @@ func (f *Filter) Compile() error { len(f.FieldPass) == 0 && len(f.TagInclude) == 0 && len(f.TagExclude) == 0 && - len(f.TagPass) == 0 && - len(f.TagDrop) == 0 { + len(f.TagPassFilters) == 0 && + len(f.TagDropFilters) == 0 { return nil } f.isActive = true var err error - f.nameDrop, err = filter.Compile(f.NameDrop) + f.nameDropFilter, err = filter.Compile(f.NameDrop) if err != nil { return fmt.Errorf("error compiling 'namedrop', %s", err) } - f.namePass, err = filter.Compile(f.NamePass) + f.namePassFilter, err = filter.Compile(f.NamePass) if err != nil { return fmt.Errorf("error compiling 'namepass', %s", err) } - f.fieldDrop, err = filter.Compile(f.FieldDrop) + f.fieldDropFilter, err = filter.Compile(f.FieldDrop) if err != nil { return fmt.Errorf("error compiling 'fielddrop', %s", err) } - f.fieldPass, err = filter.Compile(f.FieldPass) + f.fieldPassFilter, err = filter.Compile(f.FieldPass) if err != nil { return fmt.Errorf("error compiling 'fieldpass', %s", err) } - f.tagExclude, err = filter.Compile(f.TagExclude) + f.tagExcludeFilter, err = filter.Compile(f.TagExclude) if err != nil { return fmt.Errorf("error compiling 'tagexclude', %s", err) } - f.tagInclude, err = filter.Compile(f.TagInclude) + f.tagIncludeFilter, err = filter.Compile(f.TagInclude) if err != nil { return fmt.Errorf("error compiling 'taginclude', %s", err) } - for i := range f.TagDrop { - f.TagDrop[i].filter, err = filter.Compile(f.TagDrop[i].Filter) + for i := range f.TagDropFilters { + f.TagDropFilters[i].filter, err = filter.Compile(f.TagDropFilters[i].Values) if err != nil { return fmt.Errorf("error compiling 'tagdrop', %s", err) } } - for i := range f.TagPass { - f.TagPass[i].filter, err = filter.Compile(f.TagPass[i].Filter) + for i := range f.TagPassFilters { + f.TagPassFilters[i].filter, err = filter.Compile(f.TagPassFilters[i].Values) if err != nil { return fmt.Errorf("error compiling 'tagpass', %s", err) } @@ -128,46 +128,46 @@ func (f *Filter) IsActive() bool { return f.isActive } -// shouldNamePass returns true if the metric should pass, false if should drop +// shouldNamePass returns true if the metric should pass, false if it should drop // based on the drop/pass filter parameters func (f *Filter) shouldNamePass(key string) bool { pass := func(f *Filter) bool { - return f.namePass.Match(key) + return f.namePassFilter.Match(key) } drop := func(f *Filter) bool { - return !f.nameDrop.Match(key) + return !f.nameDropFilter.Match(key) } - if f.namePass != nil && f.nameDrop != nil { + if f.namePassFilter != nil && f.nameDropFilter != nil { return pass(f) && drop(f) - } else if f.namePass != nil { + } else if f.namePassFilter != nil { return pass(f) - } else if f.nameDrop != nil { + } else if f.nameDropFilter != nil { return drop(f) } return true } -// shouldFieldPass returns true if the metric should pass, false if should drop +// shouldFieldPass returns true if the metric should pass, false if it should drop // based on the drop/pass filter parameters func (f *Filter) shouldFieldPass(key string) bool { - if f.fieldPass != nil && f.fieldDrop != nil { - return f.fieldPass.Match(key) && !f.fieldDrop.Match(key) - } else if f.fieldPass != nil { - return f.fieldPass.Match(key) - } else if f.fieldDrop != nil { - return !f.fieldDrop.Match(key) + if f.fieldPassFilter != nil && f.fieldDropFilter != nil { + return f.fieldPassFilter.Match(key) && !f.fieldDropFilter.Match(key) + } else if f.fieldPassFilter != nil { + return f.fieldPassFilter.Match(key) + } else if f.fieldDropFilter != nil { + return !f.fieldDropFilter.Match(key) } return true } -// shouldTagsPass returns true if the metric should pass, false if should drop +// shouldTagsPass returns true if the metric should pass, false if it should drop // based on the tagdrop/tagpass filter parameters func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool { pass := func(f *Filter) bool { - for _, pat := range f.TagPass { + for _, pat := range f.TagPassFilters { if pat.filter == nil { continue } @@ -183,7 +183,7 @@ func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool { } drop := func(f *Filter) bool { - for _, pat := range f.TagDrop { + for _, pat := range f.TagDropFilters { if pat.filter == nil { continue } @@ -200,13 +200,13 @@ func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool { // Add additional logic in case where both parameters are set. // see: https://github.com/influxdata/telegraf/issues/2860 - if f.TagPass != nil && f.TagDrop != nil { + if f.TagPassFilters != nil && f.TagDropFilters != nil { // return true only in case when tag pass and won't be dropped (true, true). // in case when the same tag should be passed and dropped it will be dropped (true, false). return pass(f) && drop(f) - } else if f.TagPass != nil { + } else if f.TagPassFilters != nil { return pass(f) - } else if f.TagDrop != nil { + } else if f.TagDropFilters != nil { return drop(f) } @@ -230,9 +230,9 @@ func (f *Filter) filterFields(metric telegraf.Metric) { // filterTags removes tags according to taginclude/tagexclude. func (f *Filter) filterTags(metric telegraf.Metric) { filterKeys := []string{} - if f.tagInclude != nil { + if f.tagIncludeFilter != nil { for _, tag := range metric.TagList() { - if !f.tagInclude.Match(tag.Key) { + if !f.tagIncludeFilter.Match(tag.Key) { filterKeys = append(filterKeys, tag.Key) } } @@ -241,9 +241,9 @@ func (f *Filter) filterTags(metric telegraf.Metric) { metric.RemoveTag(key) } - if f.tagExclude != nil { + if f.tagExcludeFilter != nil { for _, tag := range metric.TagList() { - if f.tagExclude.Match(tag.Key) { + if f.tagExcludeFilter.Match(tag.Key) { filterKeys = append(filterKeys, tag.Key) } } diff --git a/models/filter_test.go b/models/filter_test.go index aa32e0951..92e186887 100644 --- a/models/filter_test.go +++ b/models/filter_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func TestFilter_ApplyEmpty(t *testing.T) { @@ -26,11 +27,11 @@ func TestFilter_ApplyTagsDontPass(t *testing.T) { filters := []TagFilter{ { Name: "cpu", - Filter: []string{"cpu-*"}, + Values: []string{"cpu-*"}, }, } f := Filter{ - TagDrop: filters, + TagDropFilters: filters, } require.NoError(t, f.Compile()) require.NoError(t, f.Compile()) @@ -243,14 +244,14 @@ func TestFilter_TagPass(t *testing.T) { filters := []TagFilter{ { Name: "cpu", - Filter: []string{"cpu-*"}, + Values: []string{"cpu-*"}, }, { Name: "mem", - Filter: []string{"mem_free"}, + Values: []string{"mem_free"}, }} f := Filter{ - TagPass: filters, + TagPassFilters: filters, } require.NoError(t, f.Compile()) @@ -287,14 +288,14 @@ func TestFilter_TagDrop(t *testing.T) { filters := []TagFilter{ { Name: "cpu", - Filter: []string{"cpu-*"}, + Values: []string{"cpu-*"}, }, { Name: "mem", - Filter: []string{"mem_free"}, + Values: []string{"mem_free"}, }} f := Filter{ - TagDrop: filters, + TagDropFilters: filters, } require.NoError(t, f.Compile()) @@ -445,24 +446,24 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) { filterPass := []TagFilter{ { Name: "tag1", - Filter: []string{"1", "4"}, + Values: []string{"1", "4"}, }, } filterDrop := []TagFilter{ { Name: "tag1", - Filter: []string{"4"}, + Values: []string{"4"}, }, { Name: "tag2", - Filter: []string{"3"}, + Values: []string{"3"}, }, } f := Filter{ - TagDrop: filterDrop, - TagPass: filterPass, + TagDropFilters: filterDrop, + TagPassFilters: filterPass, } require.NoError(t, f.Compile()) diff --git a/models/running_aggregator.go b/models/running_aggregator.go index b1dc621ba..9abb29f21 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -163,15 +163,11 @@ func (r *RunningAggregator) Push(acc telegraf.Accumulator) { until := r.periodEnd.Add(r.Config.Period) r.UpdateWindow(since, until) - r.push(acc) - r.Aggregator.Reset() -} - -func (r *RunningAggregator) push(acc telegraf.Accumulator) { start := time.Now() r.Aggregator.Push(acc) elapsed := time.Since(start) r.PushTime.Incr(elapsed.Nanoseconds()) + r.Aggregator.Reset() } func (r *RunningAggregator) Log() telegraf.Logger { diff --git a/models/running_output.go b/models/running_output.go index 6f5f8c0a8..3b85649fc 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -198,7 +198,7 @@ func (r *RunningOutput) Write() error { break } - err := r.write(batch) + err := r.writeMetrics(batch) if err != nil { r.buffer.Reject(batch) return err @@ -215,7 +215,7 @@ func (r *RunningOutput) WriteBatch() error { return nil } - err := r.write(batch) + err := r.writeMetrics(batch) if err != nil { r.buffer.Reject(batch) return err @@ -233,7 +233,7 @@ func (r *RunningOutput) Close() { } } -func (r *RunningOutput) write(metrics []telegraf.Metric) error { +func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error { dropped := atomic.LoadInt64(&r.droppedMetrics) if dropped > 0 { r.log.Warnf("Metric buffer overflow; %d metrics have been dropped", dropped) diff --git a/plugins/common/encoding/decoder.go b/plugins/common/encoding/decoder.go index 8bc3b7f92..7bc14ae13 100644 --- a/plugins/common/encoding/decoder.go +++ b/plugins/common/encoding/decoder.go @@ -7,13 +7,13 @@ import ( "golang.org/x/text/encoding/unicode" ) -// NewDecoder returns a x/text Decoder for the specified text encoding. The +// NewDecoder returns an x/text Decoder for the specified text encoding. The // Decoder converts a character encoding into utf-8 bytes. If a BOM is found -// it will be converted into a utf-8 BOM, you can use +// it will be converted into an utf-8 BOM, you can use // github.com/dimchansky/utfbom to strip the BOM. // // The "none" or "" encoding will pass through bytes unchecked. Use the utf-8 -// encoding if you want invalid bytes replaced using the the unicode +// encoding if you want invalid bytes replaced using the unicode // replacement character. // // Detection of utf-16 endianness using the BOM is not currently provided due @@ -22,13 +22,13 @@ import ( func NewDecoder(enc string) (*Decoder, error) { switch enc { case "utf-8": - return &Decoder{Transformer: unicode.UTF8.NewDecoder()}, nil + return createDecoder(unicode.UTF8.NewDecoder()), nil case "utf-16le": - return newDecoder(unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder()), nil + return createDecoder(unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder()), nil case "utf-16be": - return newDecoder(unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder()), nil + return createDecoder(unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder()), nil case "none", "": - return newDecoder(encoding.Nop.NewDecoder()), nil + return createDecoder(encoding.Nop.NewDecoder()), nil } return nil, errors.New("unknown character encoding") } diff --git a/plugins/common/encoding/decoder_reader.go b/plugins/common/encoding/decoder_reader.go index 586865cf7..cbd42db8e 100644 --- a/plugins/common/encoding/decoder_reader.go +++ b/plugins/common/encoding/decoder_reader.go @@ -10,7 +10,7 @@ import ( // Other than resetting r.err and r.transformComplete in Read() this // was copied from x/text -func newDecoder(t transform.Transformer) *Decoder { +func createDecoder(t transform.Transformer) *Decoder { return &Decoder{Transformer: t} } @@ -73,7 +73,7 @@ type Reader struct { src0, src1 int // transformComplete is whether the transformation is complete, - // regardless of whether or not it was successful. + // regardless of whether it was successful. transformComplete bool } diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 4da3a76fc..559efef44 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -9,8 +9,9 @@ import ( "testing" "time" - "github.com/influxdata/telegraf" "github.com/stretchr/testify/assert" + + "github.com/influxdata/telegraf" ) var ( @@ -78,7 +79,7 @@ func (a *Accumulator) ClearMetrics() { a.Metrics = make([]*Metric, 0) } -func (a *Accumulator) addFields( +func (a *Accumulator) addMeasurement( measurement string, tags map[string]string, fields map[string]interface{}, @@ -113,7 +114,6 @@ func (a *Accumulator) addFields( if len(timestamp) > 0 { t = timestamp[0] } else { - t = time.Now() if a.TimeFunc == nil { t = time.Now() } else { @@ -147,7 +147,7 @@ func (a *Accumulator) AddFields( tags map[string]string, timestamp ...time.Time, ) { - a.addFields(measurement, tags, fields, telegraf.Untyped, timestamp...) + a.addMeasurement(measurement, tags, fields, telegraf.Untyped, timestamp...) } func (a *Accumulator) AddCounter( @@ -156,7 +156,7 @@ func (a *Accumulator) AddCounter( tags map[string]string, timestamp ...time.Time, ) { - a.addFields(measurement, tags, fields, telegraf.Counter, timestamp...) + a.addMeasurement(measurement, tags, fields, telegraf.Counter, timestamp...) } func (a *Accumulator) AddGauge( @@ -165,12 +165,12 @@ func (a *Accumulator) AddGauge( tags map[string]string, timestamp ...time.Time, ) { - a.addFields(measurement, tags, fields, telegraf.Gauge, timestamp...) + a.addMeasurement(measurement, tags, fields, telegraf.Gauge, timestamp...) } func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { for _, m := range metrics { - a.addFields(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time()) + a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time()) } } @@ -180,7 +180,7 @@ func (a *Accumulator) AddSummary( tags map[string]string, timestamp ...time.Time, ) { - a.addFields(measurement, tags, fields, telegraf.Summary, timestamp...) + a.addMeasurement(measurement, tags, fields, telegraf.Summary, timestamp...) } func (a *Accumulator) AddHistogram( @@ -189,11 +189,11 @@ func (a *Accumulator) AddHistogram( tags map[string]string, timestamp ...time.Time, ) { - a.addFields(measurement, tags, fields, telegraf.Histogram, timestamp...) + a.addMeasurement(measurement, tags, fields, telegraf.Histogram, timestamp...) } func (a *Accumulator) AddMetric(m telegraf.Metric) { - a.addFields(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time()) + a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time()) } func (a *Accumulator) WithTracking(_ int) telegraf.TrackingAccumulator { @@ -296,7 +296,7 @@ func (a *Accumulator) TagValue(measurement string, key string) string { return "" } -// Calls the given Gather function and returns the first error found. +// GatherError calls the given Gather function and returns the first error found. func (a *Accumulator) GatherError(gf func(telegraf.Accumulator) error) error { if err := gf(a); err != nil { return err @@ -529,7 +529,7 @@ func (a *Accumulator) HasInt32Field(measurement string, field string) bool { return false } -// HasStringField returns true if the measurement has an String value +// HasStringField returns true if the measurement has a String value func (a *Accumulator) HasStringField(measurement string, field string) bool { a.Lock() defer a.Unlock() @@ -704,14 +704,14 @@ func (a *Accumulator) StringField(measurement string, field string) (string, boo } // BoolField returns the bool value of the given measurement and field or false. -func (a *Accumulator) BoolField(measurement string, field string) (bool, bool) { +func (a *Accumulator) BoolField(measurement string, field string) (v bool, ok bool) { a.Lock() defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { for fieldname, value := range p.Fields { if fieldname == field { - v, ok := value.(bool) + v, ok = value.(bool) return v, ok } }