feat(plugins): Allow to override log-level per plugin (#15677)

This commit is contained in:
Sven Rebhan 2024-07-30 17:15:19 +02:00 committed by GitHub
parent 6af83217e2
commit f9f029e7e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 94 additions and 41 deletions

View File

@ -971,26 +971,30 @@ func (c *Config) probeParser(parentcategory string, parentname string, table *as
} }
func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) { func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string conf := &models.ParserConfig{
c.getFieldString(table, "data_format", &dataformat) Parent: parentname,
if dataformat == "" {
dataformat = setDefaultParser(parentcategory, parentname)
} }
var influxParserType string c.getFieldString(table, "data_format", &conf.DataFormat)
c.getFieldString(table, "influx_parser_type", &influxParserType) if conf.DataFormat == "" {
if dataformat == "influx" && influxParserType == "upstream" { conf.DataFormat = setDefaultParser(parentcategory, parentname)
dataformat = "influx_upstream" } else if conf.DataFormat == "influx" {
var influxParserType string
c.getFieldString(table, "influx_parser_type", &influxParserType)
if influxParserType == "upstream" {
conf.DataFormat = "influx_upstream"
}
} }
c.getFieldString(table, "log_level", &conf.LogLevel)
creator, ok := parsers.Parsers[dataformat] creator, ok := parsers.Parsers[conf.DataFormat]
if !ok { if !ok {
return nil, fmt.Errorf("undefined but requested parser: %s", dataformat) return nil, fmt.Errorf("undefined but requested parser: %s", conf.DataFormat)
} }
parser := creator(parentname) parser := creator(parentname)
// Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022) // Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022)
if dataformat == "csv" && parentcategory == "inputs" { if conf.DataFormat == "csv" && parentcategory == "inputs" {
if parentname == "exec" { if parentname == "exec" {
csvParser := parser.(*csv.Parser) csvParser := parser.(*csv.Parser)
csvParser.ResetMode = "always" csvParser.ResetMode = "always"
@ -1001,25 +1005,24 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
return nil, err return nil, err
} }
conf := &models.ParserConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningParser(parser, conf) running := models.NewRunningParser(parser, conf)
err := running.Init() err := running.Init()
return running, err return running, err
} }
func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) { func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) {
var dataformat string conf := &models.SerializerConfig{
c.getFieldString(table, "data_format", &dataformat) Parent: parentname,
if dataformat == "" {
dataformat = "influx"
} }
c.getFieldString(table, "data_format", &conf.DataFormat)
if conf.DataFormat == "" {
conf.DataFormat = "influx"
}
c.getFieldString(table, "log_level", &conf.LogLevel)
creator, ok := serializers.Serializers[dataformat] creator, ok := serializers.Serializers[conf.DataFormat]
if !ok { if !ok {
return nil, fmt.Errorf("undefined but requested serializer: %s", dataformat) return nil, fmt.Errorf("undefined but requested serializer: %s", conf.DataFormat)
} }
serializer := creator() serializer := creator()
@ -1027,10 +1030,6 @@ func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.Run
return nil, err return nil, err
} }
conf := &models.SerializerConfig{
Parent: parentname,
DataFormat: dataformat,
}
running := models.NewRunningSerializer(serializer, conf) running := models.NewRunningSerializer(serializer, conf)
err := running.Init() err := running.Init()
return running, err return running, err
@ -1336,6 +1335,7 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
c.getFieldString(tbl, "name_suffix", &conf.MeasurementSuffix) c.getFieldString(tbl, "name_suffix", &conf.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &conf.NameOverride) c.getFieldString(tbl, "name_override", &conf.NameOverride)
c.getFieldString(tbl, "alias", &conf.Alias) c.getFieldString(tbl, "alias", &conf.Alias)
c.getFieldString(tbl, "log_level", &conf.LogLevel)
conf.Tags = make(map[string]string) conf.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok { if node, ok := tbl.Fields["tags"]; ok {
@ -1369,6 +1369,7 @@ func (c *Config) buildProcessor(category, name string, tbl *ast.Table) (*models.
c.getFieldInt64(tbl, "order", &conf.Order) c.getFieldInt64(tbl, "order", &conf.Order)
c.getFieldString(tbl, "alias", &conf.Alias) c.getFieldString(tbl, "alias", &conf.Alias)
c.getFieldString(tbl, "log_level", &conf.LogLevel)
if c.hasErrs() { if c.hasErrs() {
return nil, c.firstErr() return nil, c.firstErr()
@ -1478,6 +1479,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix) c.getFieldString(tbl, "name_suffix", &cp.MeasurementSuffix)
c.getFieldString(tbl, "name_override", &cp.NameOverride) c.getFieldString(tbl, "name_override", &cp.NameOverride)
c.getFieldString(tbl, "alias", &cp.Alias) c.getFieldString(tbl, "alias", &cp.Alias)
c.getFieldString(tbl, "log_level", &cp.LogLevel)
cp.Tags = make(map[string]string) cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok { if node, ok := tbl.Fields["tags"]; ok {
@ -1523,7 +1525,6 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
c.getFieldDuration(tbl, "flush_interval", &oc.FlushInterval) c.getFieldDuration(tbl, "flush_interval", &oc.FlushInterval)
c.getFieldDuration(tbl, "flush_jitter", &oc.FlushJitter) c.getFieldDuration(tbl, "flush_jitter", &oc.FlushJitter)
c.getFieldInt(tbl, "metric_buffer_limit", &oc.MetricBufferLimit) c.getFieldInt(tbl, "metric_buffer_limit", &oc.MetricBufferLimit)
c.getFieldInt(tbl, "metric_batch_size", &oc.MetricBatchSize) c.getFieldInt(tbl, "metric_batch_size", &oc.MetricBatchSize)
c.getFieldString(tbl, "alias", &oc.Alias) c.getFieldString(tbl, "alias", &oc.Alias)
@ -1531,6 +1532,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix) c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix) c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior) c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)
c.getFieldString(tbl, "log_level", &oc.LogLevel)
if c.hasErrs() { if c.hasErrs() {
return nil, c.firstErr() return nil, c.firstErr()
@ -1555,7 +1557,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"fielddrop", "fieldexclude", "fieldinclude", "fieldpass", "flush_interval", "flush_jitter", "fielddrop", "fieldexclude", "fieldinclude", "fieldpass", "flush_interval", "flush_jitter",
"grace", "grace",
"interval", "interval",
"lvm", // What is this used for? "log_level", "lvm", // What is this used for?
"metric_batch_size", "metric_buffer_limit", "metricpass", "metric_batch_size", "metric_buffer_limit", "metricpass",
"name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator", "name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator",
"order", "order",

View File

@ -385,38 +385,32 @@ driven operation.
Parameters that can be used with any input plugin: Parameters that can be used with any input plugin:
- **alias**: Name an instance of a plugin. - **alias**: Name an instance of a plugin.
- **interval**: - **interval**:
Overrides the `interval` setting of the [agent][Agent] for the plugin. How Overrides the `interval` setting of the [agent][Agent] for the plugin. How
often to gather this metric. Normal plugins use a single global interval, but often to gather this metric. Normal plugins use a single global interval, but
if one particular input should be run less or more often, you can configure if one particular input should be run less or more often, you can configure
that here. that here.
- **precision**: - **precision**:
Overrides the `precision` setting of the [agent][Agent] for the plugin. Overrides the `precision` setting of the [agent][Agent] for the plugin.
Collected metrics are rounded to the precision specified as an [interval][]. Collected metrics are rounded to the precision specified as an [interval][].
When this value is set on a service input, multiple events occurring at the When this value is set on a service input, multiple events occurring at the
same timestamp may be merged by the output database. same timestamp may be merged by the output database.
- **collection_jitter**: - **collection_jitter**:
Overrides the `collection_jitter` setting of the [agent][Agent] for the Overrides the `collection_jitter` setting of the [agent][Agent] for the
plugin. Collection jitter is used to jitter the collection by a random plugin. Collection jitter is used to jitter the collection by a random
[interval][]. The value must be non-zero to override the agent setting. [interval][]. The value must be non-zero to override the agent setting.
- **collection_offset**: - **collection_offset**:
Overrides the `collection_offset` setting of the [agent][Agent] for the Overrides the `collection_offset` setting of the [agent][Agent] for the
plugin. Collection offset is used to shift the collection by the given plugin. Collection offset is used to shift the collection by the given
[interval][]. The value must be non-zero to override the agent setting. [interval][]. The value must be non-zero to override the agent setting.
- **name_override**: Override the base name of the measurement. (Default is - **name_override**: Override the base name of the measurement. (Default is
the name of the input). the name of the input).
- **name_prefix**: Specifies a prefix to attach to the measurement name. - **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name. - **name_suffix**: Specifies a suffix to attach to the measurement name.
- **tags**: A map of tags to apply to a specific input's measurements. - **tags**: A map of tags to apply to a specific input's measurements.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.
The [metric filtering][] parameters can be used to limit what metrics are The [metric filtering][] parameters can be used to limit what metrics are
emitted from the input plugin. emitted from the input plugin.
@ -502,6 +496,8 @@ Parameters that can be used with any output plugin:
- **name_override**: Override the original name of the measurement. - **name_override**: Override the original name of the measurement.
- **name_prefix**: Specifies a prefix to attach to the measurement name. - **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name. - **name_suffix**: Specifies a suffix to attach to the measurement name.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.
The [metric filtering][] parameters can be used to limit what metrics are The [metric filtering][] parameters can be used to limit what metrics are
emitted from the output plugin. emitted from the output plugin.
@ -540,6 +536,8 @@ Parameters that can be used with any processor plugin:
If this is not specified then processor execution order will be the order in If this is not specified then processor execution order will be the order in
the config. Processors without "order" will take precedence over those the config. Processors without "order" will take precedence over those
with a defined order. with a defined order.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.
The [metric filtering][] parameters can be used to limit what metrics are The [metric filtering][] parameters can be used to limit what metrics are
handled by the processor. Excluded metrics are passed downstream to the next handled by the processor. Excluded metrics are passed downstream to the next
@ -592,6 +590,8 @@ Parameters that can be used with any aggregator plugin:
- **name_prefix**: Specifies a prefix to attach to the measurement name. - **name_prefix**: Specifies a prefix to attach to the measurement name.
- **name_suffix**: Specifies a suffix to attach to the measurement name. - **name_suffix**: Specifies a suffix to attach to the measurement name.
- **tags**: A map of tags to apply to the measurement - behavior varies based on aggregator. - **tags**: A map of tags to apply to the measurement - behavior varies based on aggregator.
- **log_level**: Override the log-level for this plugin. Possible values are
`error`, `warn`, `info` and `debug`.
The [metric filtering][] parameters can be used to limit what metrics are The [metric filtering][] parameters can be used to limit what metrics are
handled by the aggregator. Excluded metrics are passed downstream to the next handled by the aggregator. Excluded metrics are passed downstream to the next

View File

@ -16,6 +16,20 @@ const (
Debug Debug
) )
func LogLevelFromString(name string) LogLevel {
switch name {
case "ERROR", "error":
return Error
case "WARN", "warn":
return Warn
case "INFO", "info":
return Info
case "DEBUG", "debug":
return Debug
}
return None
}
func (e LogLevel) String() string { func (e LogLevel) String() string {
switch e { switch e {
case Error: case Error:

View File

@ -55,6 +55,19 @@ func New(category, name, alias string) *logger {
return l return l
} }
// SetLevel changes the log-level to the given one
func (l *logger) SetLogLevel(name string) error {
if name == "" {
return nil
}
level := telegraf.LogLevelFromString(name)
if level == telegraf.None {
return fmt.Errorf("invalid log-level %q", name)
}
l.level = &level
return nil
}
// SubLogger creates a new logger with the given name added as suffix // SubLogger creates a new logger with the given name added as suffix
func (l *logger) SubLogger(name string) telegraf.Logger { func (l *logger) SubLogger(name string) telegraf.Logger {
suffix := l.suffix suffix := l.suffix

View File

@ -35,7 +35,9 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
logger.RegisterErrorCallback(func() { logger.RegisterErrorCallback(func() {
aggErrorsRegister.Incr(1) aggErrorsRegister.Incr(1)
}) })
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(aggregator, logger) SetLoggerOnPlugin(aggregator, logger)
return &RunningAggregator{ return &RunningAggregator{
@ -74,6 +76,7 @@ type AggregatorConfig struct {
Period time.Duration Period time.Duration
Delay time.Duration Delay time.Duration
Grace time.Duration Grace time.Duration
LogLevel string
NameOverride string NameOverride string
MeasurementPrefix string MeasurementPrefix string

View File

@ -46,6 +46,9 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1) inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1) GlobalGatherErrors.Incr(1)
}) })
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(input, logger) SetLoggerOnPlugin(input, logger)
return &RunningInput{ return &RunningInput{
@ -85,6 +88,7 @@ type InputConfig struct {
CollectionOffset time.Duration CollectionOffset time.Duration
Precision time.Duration Precision time.Duration
StartupErrorBehavior string StartupErrorBehavior string
LogLevel string
NameOverride string NameOverride string
MeasurementPrefix string MeasurementPrefix string

View File

@ -40,6 +40,8 @@ type OutputConfig struct {
BufferStrategy string BufferStrategy string
BufferDirectory string BufferDirectory string
LogLevel string
} }
// RunningOutput contains the output configuration // RunningOutput contains the output configuration
@ -84,6 +86,9 @@ func NewRunningOutput(
logger.RegisterErrorCallback(func() { logger.RegisterErrorCallback(func() {
writeErrorsRegister.Incr(1) writeErrorsRegister.Incr(1)
}) })
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(output, logger) SetLoggerOnPlugin(output, logger)
if config.MetricBufferLimit > 0 { if config.MetricBufferLimit > 0 {

View File

@ -28,6 +28,9 @@ func NewRunningParser(parser telegraf.Parser, config *ParserConfig) *RunningPars
logger.RegisterErrorCallback(func() { logger.RegisterErrorCallback(func() {
parserErrorsRegister.Incr(1) parserErrorsRegister.Incr(1)
}) })
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(parser, logger) SetLoggerOnPlugin(parser, logger)
return &RunningParser{ return &RunningParser{
@ -53,6 +56,7 @@ type ParserConfig struct {
Alias string Alias string
DataFormat string DataFormat string
DefaultTags map[string]string DefaultTags map[string]string
LogLevel string
} }
func (r *RunningParser) LogName() string { func (r *RunningParser) LogName() string {

View File

@ -23,11 +23,12 @@ func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp
// ProcessorConfig containing a name and filter // ProcessorConfig containing a name and filter
type ProcessorConfig struct { type ProcessorConfig struct {
Name string Name string
Alias string Alias string
ID string ID string
Order int64 Order int64
Filter Filter Filter Filter
LogLevel string
} }
func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor { func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor {
@ -41,6 +42,9 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.RegisterErrorCallback(func() { logger.RegisterErrorCallback(func() {
processErrorsRegister.Incr(1) processErrorsRegister.Incr(1)
}) })
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(processor, logger) SetLoggerOnPlugin(processor, logger)
return &RunningProcessor{ return &RunningProcessor{

View File

@ -15,6 +15,7 @@ type SerializerConfig struct {
Alias string Alias string
DataFormat string DataFormat string
DefaultTags map[string]string DefaultTags map[string]string
LogLevel string
} }
type RunningSerializer struct { type RunningSerializer struct {
@ -38,6 +39,9 @@ func NewRunningSerializer(serializer serializers.Serializer, config *SerializerC
logger.RegisterErrorCallback(func() { logger.RegisterErrorCallback(func() {
serializerErrorsRegister.Incr(1) serializerErrorsRegister.Incr(1)
}) })
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(serializer, logger) SetLoggerOnPlugin(serializer, logger)
return &RunningSerializer{ return &RunningSerializer{