telegraf/plugins/parsers/registry.go

430 lines
14 KiB
Go
Raw Normal View History

package parsers
import (
"fmt"
"github.com/influxdata/telegraf"
2017-04-13 01:41:26 +08:00
"github.com/influxdata/telegraf/plugins/parsers/collectd"
2018-08-25 07:40:41 +08:00
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
2021-06-11 03:22:18 +08:00
"github.com/influxdata/telegraf/plugins/parsers/json_v2"
2018-08-23 04:55:41 +08:00
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
"github.com/influxdata/telegraf/plugins/parsers/value"
2018-08-14 07:37:06 +08:00
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
"github.com/influxdata/telegraf/plugins/parsers/xml"
)
type ParserFunc func() (Parser, error)
// ParserInput is an interface for input plugins that are able to parse
// arbitrary data formats.
type ParserInput interface {
// SetParser sets the parser function for the interface
SetParser(parser Parser)
}
// ParserFuncInput is an interface for input plugins that are able to parse
// arbitrary data formats.
type ParserFuncInput interface {
// GetParser returns a new parser.
SetParserFunc(fn ParserFunc)
}
// Parser is an interface defining functions that a parser plugin must satisfy.
type Parser interface {
// Parse takes a byte buffer separated by newlines
// ie, `cpu.usage.idle 90\ncpu.usage.busy 10`
// and parses it into telegraf metrics
//
// Must be thread-safe.
Parse(buf []byte) ([]telegraf.Metric, error)
// ParseLine takes a single string metric
// ie, "cpu.usage.idle 90"
// and parses it into a telegraf metric.
//
// Must be thread-safe.
2021-06-11 03:22:18 +08:00
// This function is only called by plugins that expect line based protocols
// Doesn't need to be implemented by non-linebased parsers (e.g. json, xml)
ParseLine(line string) (telegraf.Metric, error)
2016-02-10 06:03:46 +08:00
// SetDefaultTags tells the parser to add all of the given tags
// to each parsed metric.
// NOTE: do _not_ modify the map after you've passed it here!!
SetDefaultTags(tags map[string]string)
}
// Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers.
type Config struct {
// Dataformat can be one of: json, influx, graphite, value, nagios
DataFormat string `toml:"data_format"`
// Separator only applied to Graphite data.
Separator string `toml:"separator"`
// Templates only apply to Graphite data.
Templates []string `toml:"templates"`
// TagKeys only apply to JSON data
TagKeys []string `toml:"tag_keys"`
// Array of glob pattern strings keys that should be added as string fields.
JSONStringFields []string `toml:"json_string_fields"`
JSONNameKey string `toml:"json_name_key"`
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string `toml:"metric_name"`
// holds a gjson path for json parser
JSONQuery string `toml:"json_query"`
// key of time
JSONTimeKey string `toml:"json_time_key"`
// time format
JSONTimeFormat string `toml:"json_time_format"`
// default timezone
JSONTimezone string `toml:"json_timezone"`
2019-10-24 05:06:39 +08:00
// Whether to continue if a JSON object can't be coerced
JSONStrict bool `toml:"json_strict"`
2017-04-13 01:41:26 +08:00
// Authentication file for collectd
CollectdAuthFile string `toml:"collectd_auth_file"`
2017-04-13 01:41:26 +08:00
// One of none (default), sign, or encrypt
CollectdSecurityLevel string `toml:"collectd_security_level"`
2017-04-13 01:41:26 +08:00
// Dataset specification for collectd
CollectdTypesDB []string `toml:"collectd_types_db"`
2017-04-13 01:41:26 +08:00
// whether to split or join multivalue metrics
CollectdSplit string `toml:"collectd_split"`
// DataType only applies to value, this will be the type to parse value to
DataType string `toml:"data_type"`
// DefaultTags are the default tags that will be added to all parsed metrics.
DefaultTags map[string]string `toml:"default_tags"`
// an optional json path containing the metric registry object
// if left empty, the whole json object is parsed as a metric registry
DropwizardMetricRegistryPath string `toml:"dropwizard_metric_registry_path"`
// an optional json path containing the default time of the metrics
// if left empty, the processing time is used
DropwizardTimePath string `toml:"dropwizard_time_path"`
// time format to use for parsing the time field
// defaults to time.RFC3339
DropwizardTimeFormat string `toml:"dropwizard_time_format"`
// an optional json path pointing to a json object with tag key/value pairs
// takes precedence over DropwizardTagPathsMap
DropwizardTagsPath string `toml:"dropwizard_tags_path"`
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string `toml:"dropwizard_tag_paths_map"`
//grok patterns
GrokPatterns []string `toml:"grok_patterns"`
GrokNamedPatterns []string `toml:"grok_named_patterns"`
GrokCustomPatterns string `toml:"grok_custom_patterns"`
GrokCustomPatternFiles []string `toml:"grok_custom_pattern_files"`
GrokTimezone string `toml:"grok_timezone"`
GrokUniqueTimestamp string `toml:"grok_unique_timestamp"`
2018-08-25 07:40:41 +08:00
//csv configuration
CSVColumnNames []string `toml:"csv_column_names"`
CSVColumnTypes []string `toml:"csv_column_types"`
CSVComment string `toml:"csv_comment"`
CSVDelimiter string `toml:"csv_delimiter"`
CSVHeaderRowCount int `toml:"csv_header_row_count"`
CSVMeasurementColumn string `toml:"csv_measurement_column"`
CSVSkipColumns int `toml:"csv_skip_columns"`
CSVSkipRows int `toml:"csv_skip_rows"`
CSVTagColumns []string `toml:"csv_tag_columns"`
CSVTimestampColumn string `toml:"csv_timestamp_column"`
CSVTimestampFormat string `toml:"csv_timestamp_format"`
CSVTimezone string `toml:"csv_timezone"`
CSVTrimSpace bool `toml:"csv_trim_space"`
CSVSkipValues []string `toml:"csv_skip_values"`
2019-06-18 04:34:54 +08:00
// FormData configuration
FormUrlencodedTagKeys []string `toml:"form_urlencoded_tag_keys"`
// Value configuration
ValueFieldName string `toml:"value_field_name"`
// XML configuration
XMLConfig []XMLConfig `toml:"xml"`
2021-06-11 03:22:18 +08:00
// JSONPath configuration
JSONV2Config []JSONV2Config `toml:"json_v2"`
}
type XMLConfig struct {
xml.Config
}
2021-06-11 03:22:18 +08:00
type JSONV2Config struct {
json_v2.Config
}
// NewParser returns a Parser interface based on the given config.
func NewParser(config *Config) (Parser, error) {
var err error
var parser Parser
switch config.DataFormat {
case "json":
parser, err = json.New(
&json.Config{
MetricName: config.MetricName,
TagKeys: config.TagKeys,
NameKey: config.JSONNameKey,
StringFields: config.JSONStringFields,
Query: config.JSONQuery,
TimeKey: config.JSONTimeKey,
TimeFormat: config.JSONTimeFormat,
Timezone: config.JSONTimezone,
DefaultTags: config.DefaultTags,
2019-10-24 05:06:39 +08:00
Strict: config.JSONStrict,
},
)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.ValueFieldName, config.DefaultTags)
case "influx":
parser, err = NewInfluxParser()
case "nagios":
parser, err = NewNagiosParser()
case "graphite":
parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags)
2017-04-13 01:41:26 +08:00
case "collectd":
parser, err = NewCollectdParser(config.CollectdAuthFile,
config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit)
case "dropwizard":
parser, err = NewDropwizardParser(
config.DropwizardMetricRegistryPath,
config.DropwizardTimePath,
config.DropwizardTimeFormat,
config.DropwizardTagsPath,
config.DropwizardTagPathsMap,
config.DefaultTags,
config.Separator,
config.Templates)
2018-08-14 07:37:06 +08:00
case "wavefront":
parser, err = NewWavefrontParser(config.DefaultTags)
case "grok":
parser, err = newGrokParser(
config.MetricName,
config.GrokPatterns,
config.GrokNamedPatterns,
config.GrokCustomPatterns,
config.GrokCustomPatternFiles,
config.GrokTimezone,
config.GrokUniqueTimestamp)
2018-08-25 07:40:41 +08:00
case "csv":
config := &csv.Config{
MetricName: config.MetricName,
HeaderRowCount: config.CSVHeaderRowCount,
SkipRows: config.CSVSkipRows,
SkipColumns: config.CSVSkipColumns,
Delimiter: config.CSVDelimiter,
Comment: config.CSVComment,
TrimSpace: config.CSVTrimSpace,
ColumnNames: config.CSVColumnNames,
ColumnTypes: config.CSVColumnTypes,
TagColumns: config.CSVTagColumns,
MeasurementColumn: config.CSVMeasurementColumn,
TimestampColumn: config.CSVTimestampColumn,
TimestampFormat: config.CSVTimestampFormat,
Timezone: config.CSVTimezone,
DefaultTags: config.DefaultTags,
SkipValues: config.CSVSkipValues,
}
return csv.NewParser(config)
2018-08-23 04:55:41 +08:00
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
case "form_urlencoded":
parser, err = NewFormUrlencodedParser(
2019-06-18 04:34:54 +08:00
config.MetricName,
config.DefaultTags,
config.FormUrlencodedTagKeys,
2019-06-18 04:34:54 +08:00
)
case "prometheus":
parser, err = NewPrometheusParser(config.DefaultTags)
case "prometheusremotewrite":
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
case "xml":
parser, err = NewXMLParser(config.MetricName, config.DefaultTags, config.XMLConfig)
2021-06-11 03:22:18 +08:00
case "json_v2":
parser, err = NewJSONPathParser(config.JSONV2Config)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
return parser, err
}
func newGrokParser(metricName string,
patterns []string, nPatterns []string,
cPatterns string, cPatternFiles []string,
tZone string, uniqueTimestamp string) (Parser, error) {
parser := grok.Parser{
Measurement: metricName,
Patterns: patterns,
NamedPatterns: nPatterns,
CustomPatterns: cPatterns,
CustomPatternFiles: cPatternFiles,
Timezone: tZone,
UniqueTimestamp: uniqueTimestamp,
}
err := parser.Compile()
return &parser, err
}
func NewNagiosParser() (Parser, error) {
return &nagios.NagiosParser{}, nil
}
func NewInfluxParser() (Parser, error) {
handler := influx.NewMetricHandler()
return influx.NewParser(handler), nil
}
func NewGraphiteParser(
separator string,
templates []string,
defaultTags map[string]string,
) (Parser, error) {
return graphite.NewGraphiteParser(separator, templates, defaultTags)
}
func NewValueParser(
metricName string,
dataType string,
fieldName string,
defaultTags map[string]string,
) (Parser, error) {
return value.NewValueParser(metricName, dataType, fieldName, defaultTags), nil
}
2017-04-13 01:41:26 +08:00
func NewCollectdParser(
authFile string,
securityLevel string,
typesDB []string,
split string,
2017-04-13 01:41:26 +08:00
) (Parser, error) {
return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split)
2017-04-13 01:41:26 +08:00
}
func NewDropwizardParser(
metricRegistryPath string,
timePath string,
timeFormat string,
tagsPath string,
tagPathsMap map[string]string,
defaultTags map[string]string,
separator string,
templates []string,
) (Parser, error) {
parser := dropwizard.NewParser()
parser.MetricRegistryPath = metricRegistryPath
parser.TimePath = timePath
parser.TimeFormat = timeFormat
parser.TagsPath = tagsPath
parser.TagPathsMap = tagPathsMap
parser.DefaultTags = defaultTags
err := parser.SetTemplates(separator, templates)
if err != nil {
return nil, err
}
return parser, err
}
2018-08-14 07:37:06 +08:00
2018-08-23 04:55:41 +08:00
// NewLogFmtParser returns a logfmt parser with the default options.
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
return logfmt.NewParser(metricName, defaultTags), nil
}
2018-08-14 07:37:06 +08:00
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
return wavefront.NewWavefrontParser(defaultTags), nil
}
2019-06-18 04:34:54 +08:00
func NewFormUrlencodedParser(
2019-06-18 04:34:54 +08:00
metricName string,
defaultTags map[string]string,
tagKeys []string,
) (Parser, error) {
return &form_urlencoded.Parser{
2019-06-18 04:34:54 +08:00
MetricName: metricName,
DefaultTags: defaultTags,
TagKeys: tagKeys,
}, nil
}
func NewPrometheusParser(defaultTags map[string]string) (Parser, error) {
return &prometheus.Parser{
DefaultTags: defaultTags,
}, nil
}
func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) {
return &prometheusremotewrite.Parser{
DefaultTags: defaultTags,
}, nil
}
func NewXMLParser(metricName string, defaultTags map[string]string, xmlConfigs []XMLConfig) (Parser, error) {
// Convert the config formats which is a one-to-one copy
configs := make([]xml.Config, len(xmlConfigs))
for i, cfg := range xmlConfigs {
configs[i].MetricName = metricName
configs[i].MetricQuery = cfg.MetricQuery
configs[i].Selection = cfg.Selection
configs[i].Timestamp = cfg.Timestamp
configs[i].TimestampFmt = cfg.TimestampFmt
configs[i].Tags = cfg.Tags
configs[i].Fields = cfg.Fields
configs[i].FieldsInt = cfg.FieldsInt
configs[i].FieldSelection = cfg.FieldSelection
configs[i].FieldNameQuery = cfg.FieldNameQuery
configs[i].FieldValueQuery = cfg.FieldValueQuery
configs[i].FieldNameExpand = cfg.FieldNameExpand
}
return &xml.Parser{
Configs: configs,
DefaultTags: defaultTags,
}, nil
}
2021-06-11 03:22:18 +08:00
func NewJSONPathParser(jsonv2config []JSONV2Config) (Parser, error) {
configs := make([]json_v2.Config, len(jsonv2config))
for i, cfg := range jsonv2config {
configs[i].MeasurementName = cfg.MeasurementName
configs[i].MeasurementNamePath = cfg.MeasurementNamePath
configs[i].TimestampPath = cfg.TimestampPath
configs[i].TimestampFormat = cfg.TimestampFormat
configs[i].TimestampTimezone = cfg.TimestampTimezone
configs[i].Fields = cfg.Fields
configs[i].Tags = cfg.Tags
configs[i].JSONObjects = cfg.JSONObjects
}
return &json_v2.Parser{
Configs: configs,
}, nil
}