feat: Migrate collectd parser to new style (#11367)
This commit is contained in:
parent
fcc9373eba
commit
bf4e0500f7
|
|
@ -1762,7 +1762,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
|
||||||
// Parser options to ignore
|
// Parser options to ignore
|
||||||
case "data_type", "separator", "tag_keys",
|
case "data_type", "separator", "tag_keys",
|
||||||
// "templates", // shared with serializers
|
// "templates", // shared with serializers
|
||||||
"collectd_auth_file", "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb",
|
|
||||||
"dropwizard_metric_registry_path", "dropwizard_tags_path", "dropwizard_tag_paths",
|
"dropwizard_metric_registry_path", "dropwizard_tags_path", "dropwizard_tag_paths",
|
||||||
"dropwizard_time_format", "dropwizard_time_path",
|
"dropwizard_time_format", "dropwizard_time_path",
|
||||||
"form_urlencoded_tag_keys",
|
"form_urlencoded_tag_keys",
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package all
|
||||||
|
|
||||||
import (
|
import (
|
||||||
//Blank imports for plugins to register themselves
|
//Blank imports for plugins to register themselves
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/parsers/collectd"
|
||||||
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
|
_ "github.com/influxdata/telegraf/plugins/parsers/csv"
|
||||||
_ "github.com/influxdata/telegraf/plugins/parsers/json"
|
_ "github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
|
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
|
||||||
|
|
|
||||||
|
|
@ -10,70 +10,62 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultAuthFile = "/etc/collectd/auth_file"
|
DefaultAuthFile = "/etc/collectd/auth_file"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CollectdParser struct {
|
type Parser struct {
|
||||||
// DefaultTags will be added to every parsed metric
|
DefaultTags map[string]string `toml:"-"`
|
||||||
DefaultTags map[string]string
|
|
||||||
|
|
||||||
//whether or not to split multi value metric into multiple metrics
|
//whether or not to split multi value metric into multiple metrics
|
||||||
//default value is split
|
//default value is split
|
||||||
ParseMultiValue string
|
ParseMultiValue string `toml:"collectd_parse_multivalue"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
|
||||||
popts network.ParseOpts
|
popts network.ParseOpts
|
||||||
|
AuthFile string `toml:"collectd_auth_file"`
|
||||||
|
SecurityLevel string `toml:"collectd_security_level"`
|
||||||
|
TypesDB []string `toml:"collectd_typesdb"`
|
||||||
|
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *CollectdParser) SetParseOpts(popts *network.ParseOpts) {
|
func (p *Parser) Init() error {
|
||||||
p.popts = *popts
|
switch p.SecurityLevel {
|
||||||
}
|
|
||||||
|
|
||||||
func NewCollectdParser(
|
|
||||||
authFile string,
|
|
||||||
securityLevel string,
|
|
||||||
typesDB []string,
|
|
||||||
split string,
|
|
||||||
) (*CollectdParser, error) {
|
|
||||||
popts := network.ParseOpts{}
|
|
||||||
|
|
||||||
switch securityLevel {
|
|
||||||
case "none":
|
case "none":
|
||||||
popts.SecurityLevel = network.None
|
p.popts.SecurityLevel = network.None
|
||||||
case "sign":
|
case "sign":
|
||||||
popts.SecurityLevel = network.Sign
|
p.popts.SecurityLevel = network.Sign
|
||||||
case "encrypt":
|
case "encrypt":
|
||||||
popts.SecurityLevel = network.Encrypt
|
p.popts.SecurityLevel = network.Encrypt
|
||||||
default:
|
default:
|
||||||
popts.SecurityLevel = network.None
|
p.popts.SecurityLevel = network.None
|
||||||
}
|
}
|
||||||
|
|
||||||
if authFile == "" {
|
if p.AuthFile == "" {
|
||||||
authFile = DefaultAuthFile
|
p.AuthFile = DefaultAuthFile
|
||||||
}
|
}
|
||||||
popts.PasswordLookup = network.NewAuthFile(authFile)
|
p.popts.PasswordLookup = network.NewAuthFile(p.AuthFile)
|
||||||
|
|
||||||
for _, path := range typesDB {
|
for _, path := range p.TypesDB {
|
||||||
db, err := LoadTypesDB(path)
|
db, err := LoadTypesDB(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if popts.TypesDB != nil {
|
if p.popts.TypesDB != nil {
|
||||||
popts.TypesDB.Merge(db)
|
p.popts.TypesDB.Merge(db)
|
||||||
} else {
|
} else {
|
||||||
popts.TypesDB = db
|
p.popts.TypesDB = db
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
parser := CollectdParser{popts: popts,
|
return nil
|
||||||
ParseMultiValue: split}
|
|
||||||
return &parser, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
valueLists, err := network.Parse(buf, p.popts)
|
valueLists, err := network.Parse(buf, p.popts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("collectd parser error: %s", err)
|
return nil, fmt.Errorf("collectd parser error: %s", err)
|
||||||
|
|
@ -98,7 +90,7 @@ func (p *CollectdParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
return metrics, nil
|
return metrics, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *CollectdParser) ParseLine(line string) (telegraf.Metric, error) {
|
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||||
metrics, err := p.Parse([]byte(line))
|
metrics, err := p.Parse([]byte(line))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -111,12 +103,12 @@ func (p *CollectdParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||||
return metrics[0], nil
|
return metrics[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *CollectdParser) SetDefaultTags(tags map[string]string) {
|
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||||
p.DefaultTags = tags
|
p.DefaultTags = tags
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmarshalValueList translates a ValueList into a Telegraf metric.
|
// unmarshalValueList translates a ValueList into a Telegraf metric.
|
||||||
func (p *CollectdParser) unmarshalValueList(vl *api.ValueList) []telegraf.Metric {
|
func (p *Parser) unmarshalValueList(vl *api.ValueList) []telegraf.Metric {
|
||||||
timestamp := vl.Time.UTC()
|
timestamp := vl.Time.UTC()
|
||||||
|
|
||||||
var metrics []telegraf.Metric
|
var metrics []telegraf.Metric
|
||||||
|
|
@ -205,3 +197,21 @@ func LoadTypesDB(path string) (*api.TypesDB, error) {
|
||||||
}
|
}
|
||||||
return api.NewTypesDB(reader)
|
return api.NewTypesDB(reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
parsers.Add("collectd",
|
||||||
|
func(_ string) telegraf.Parser {
|
||||||
|
return &Parser{
|
||||||
|
AuthFile: DefaultAuthFile,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Parser) InitFromConfig(config *parsers.Config) error {
|
||||||
|
p.AuthFile = config.CollectdAuthFile
|
||||||
|
p.SecurityLevel = config.CollectdSecurityLevel
|
||||||
|
p.TypesDB = config.CollectdTypesDB
|
||||||
|
p.ParseMultiValue = config.CollectdSplit
|
||||||
|
|
||||||
|
return p.Init()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -108,8 +108,10 @@ var multiMetric = testCase{
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewCollectdParser(t *testing.T) {
|
func TestNewCollectdParser(t *testing.T) {
|
||||||
parser, err := NewCollectdParser("", "", []string{}, "join")
|
parser := Parser{
|
||||||
require.NoError(t, err)
|
ParseMultiValue: "join",
|
||||||
|
}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
require.Equal(t, parser.popts.SecurityLevel, network.None)
|
require.Equal(t, parser.popts.SecurityLevel, network.None)
|
||||||
require.NotNil(t, parser.popts.PasswordLookup)
|
require.NotNil(t, parser.popts.PasswordLookup)
|
||||||
require.Nil(t, parser.popts.TypesDB)
|
require.Nil(t, parser.popts.TypesDB)
|
||||||
|
|
@ -124,8 +126,8 @@ func TestParse(t *testing.T) {
|
||||||
bytes, err := buf.Bytes()
|
bytes, err := buf.Bytes()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
parser := &CollectdParser{}
|
parser := &Parser{}
|
||||||
require.NoError(t, err)
|
require.NoError(t, parser.Init())
|
||||||
metrics, err := parser.Parse(bytes)
|
metrics, err := parser.Parse(bytes)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
@ -139,20 +141,36 @@ func TestParseMultiValueSplit(t *testing.T) {
|
||||||
bytes, err := buf.Bytes()
|
bytes, err := buf.Bytes()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
parser := &CollectdParser{ParseMultiValue: "split"}
|
parser := &Parser{ParseMultiValue: "split"}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
metrics, err := parser.Parse(bytes)
|
metrics, err := parser.Parse(bytes)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, 2, len(metrics))
|
require.Equal(t, 2, len(metrics))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseMultiValueJoin(t *testing.T) {
|
||||||
|
buf, err := writeValueList(multiMetric.vl)
|
||||||
|
require.NoError(t, err)
|
||||||
|
bytes, err := buf.Bytes()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
parser := &Parser{ParseMultiValue: "join"}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
metrics, err := parser.Parse(bytes)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(metrics))
|
||||||
|
}
|
||||||
|
|
||||||
func TestParse_DefaultTags(t *testing.T) {
|
func TestParse_DefaultTags(t *testing.T) {
|
||||||
buf, err := writeValueList(singleMetric.vl)
|
buf, err := writeValueList(singleMetric.vl)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
bytes, err := buf.Bytes()
|
bytes, err := buf.Bytes()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
parser := &CollectdParser{}
|
parser := &Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
parser.SetDefaultTags(map[string]string{
|
parser.SetDefaultTags(map[string]string{
|
||||||
"foo": "bar",
|
"foo": "bar",
|
||||||
})
|
})
|
||||||
|
|
@ -164,16 +182,11 @@ func TestParse_DefaultTags(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParse_SignSecurityLevel(t *testing.T) {
|
func TestParse_SignSecurityLevel(t *testing.T) {
|
||||||
parser := &CollectdParser{}
|
parser := &Parser{
|
||||||
popts := &network.ParseOpts{
|
SecurityLevel: "sign",
|
||||||
SecurityLevel: network.Sign,
|
AuthFile: "testdata/authfile",
|
||||||
PasswordLookup: &AuthMap{
|
|
||||||
map[string]string{
|
|
||||||
"user0": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
parser.SetParseOpts(popts)
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
// Signed data
|
// Signed data
|
||||||
buf, err := writeValueList(singleMetric.vl)
|
buf, err := writeValueList(singleMetric.vl)
|
||||||
|
|
@ -219,16 +232,11 @@ func TestParse_SignSecurityLevel(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParse_EncryptSecurityLevel(t *testing.T) {
|
func TestParse_EncryptSecurityLevel(t *testing.T) {
|
||||||
parser := &CollectdParser{}
|
parser := &Parser{
|
||||||
popts := &network.ParseOpts{
|
SecurityLevel: "encrypt",
|
||||||
SecurityLevel: network.Encrypt,
|
AuthFile: "testdata/authfile",
|
||||||
PasswordLookup: &AuthMap{
|
|
||||||
map[string]string{
|
|
||||||
"user0": "bar",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
parser.SetParseOpts(popts)
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
// Signed data skipped
|
// Signed data skipped
|
||||||
buf, err := writeValueList(singleMetric.vl)
|
buf, err := writeValueList(singleMetric.vl)
|
||||||
|
|
@ -278,9 +286,10 @@ func TestParseLine(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
bytes, err := buf.Bytes()
|
bytes, err := buf.Bytes()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
parser := Parser{
|
||||||
parser, err := NewCollectdParser("", "", []string{}, "split")
|
ParseMultiValue: "split",
|
||||||
require.NoError(t, err)
|
}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
metric, err := parser.ParseLine(string(bytes))
|
metric, err := parser.ParseLine(string(bytes))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
user0: bar
|
||||||
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
|
"github.com/influxdata/telegraf/plugins/parsers/form_urlencoded"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
||||||
|
|
@ -218,9 +217,6 @@ func NewParser(config *Config) (Parser, error) {
|
||||||
case "graphite":
|
case "graphite":
|
||||||
parser, err = NewGraphiteParser(config.Separator,
|
parser, err = NewGraphiteParser(config.Separator,
|
||||||
config.Templates, config.DefaultTags)
|
config.Templates, config.DefaultTags)
|
||||||
case "collectd":
|
|
||||||
parser, err = NewCollectdParser(config.CollectdAuthFile,
|
|
||||||
config.CollectdSecurityLevel, config.CollectdTypesDB, config.CollectdSplit)
|
|
||||||
case "dropwizard":
|
case "dropwizard":
|
||||||
parser, err = NewDropwizardParser(
|
parser, err = NewDropwizardParser(
|
||||||
config.DropwizardMetricRegistryPath,
|
config.DropwizardMetricRegistryPath,
|
||||||
|
|
@ -321,15 +317,6 @@ func NewValueParser(
|
||||||
return value.NewValueParser(metricName, dataType, fieldName, defaultTags), nil
|
return value.NewValueParser(metricName, dataType, fieldName, defaultTags), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCollectdParser(
|
|
||||||
authFile string,
|
|
||||||
securityLevel string,
|
|
||||||
typesDB []string,
|
|
||||||
split string,
|
|
||||||
) (Parser, error) {
|
|
||||||
return collectd.NewCollectdParser(authFile, securityLevel, typesDB, split)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDropwizardParser(
|
func NewDropwizardParser(
|
||||||
metricRegistryPath string,
|
metricRegistryPath string,
|
||||||
timePath string,
|
timePath string,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue