fix(custom_builder): Correctly handle serializers and parsers (#13446)
This commit is contained in:
parent
390751b94b
commit
a2125f0457
|
|
@ -47,6 +47,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
## Delay before the process is restarted after an unexpected termination
|
## Delay before the process is restarted after an unexpected termination
|
||||||
# restart_delay = "10s"
|
# restart_delay = "10s"
|
||||||
|
|
||||||
|
## Serialization format for communicating with the executed program
|
||||||
|
## Please note that the corresponding data-format must exist both in
|
||||||
|
## parsers and serializers
|
||||||
|
# data_format = "influx"
|
||||||
```
|
```
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
|
||||||
|
|
@ -13,3 +13,8 @@
|
||||||
|
|
||||||
## Delay before the process is restarted after an unexpected termination
|
## Delay before the process is restarted after an unexpected termination
|
||||||
# restart_delay = "10s"
|
# restart_delay = "10s"
|
||||||
|
|
||||||
|
## Serialization format for communicating with the executed program
|
||||||
|
## Please note that the corresponding data-format must exist both in
|
||||||
|
## parsers and serializers
|
||||||
|
# data_format = "influx"
|
||||||
|
|
|
||||||
|
|
@ -13,15 +13,20 @@ import (
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
type pluginState map[string]bool
|
type instance struct {
|
||||||
type selection map[string]pluginState
|
category string
|
||||||
|
name string
|
||||||
|
enabled bool
|
||||||
|
dataformat string
|
||||||
|
}
|
||||||
|
|
||||||
|
type selection struct {
|
||||||
|
plugins map[string][]instance
|
||||||
|
}
|
||||||
|
|
||||||
func ImportConfigurations(files, dirs []string) (*selection, int, error) {
|
func ImportConfigurations(files, dirs []string) (*selection, int, error) {
|
||||||
sel := selection(make(map[string]pluginState))
|
sel := &selection{
|
||||||
|
plugins: make(map[string][]instance),
|
||||||
// Initialize the categories
|
|
||||||
for _, category := range categories {
|
|
||||||
sel[category] = make(map[string]bool)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gather all configuration files
|
// Gather all configuration files
|
||||||
|
|
@ -44,12 +49,12 @@ func ImportConfigurations(files, dirs []string) (*selection, int, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(filenames) == 0 {
|
if len(filenames) == 0 {
|
||||||
return &sel, 0, errors.New("no configuration files given or found")
|
return sel, 0, errors.New("no configuration files given or found")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do the actual import
|
// Do the actual import
|
||||||
err := sel.importFiles(filenames)
|
err := sel.importFiles(filenames)
|
||||||
return &sel, len(filenames), err
|
return sel, len(filenames), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *selection) Filter(p packageCollection) *packageCollection {
|
func (s *selection) Filter(p packageCollection) *packageCollection {
|
||||||
|
|
@ -57,51 +62,68 @@ func (s *selection) Filter(p packageCollection) *packageCollection {
|
||||||
packages: map[string][]packageInfo{},
|
packages: map[string][]packageInfo{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
implicitlyConfigured := make(map[string]bool)
|
||||||
for category, pkgs := range p.packages {
|
for category, pkgs := range p.packages {
|
||||||
var categoryEnabledPackages []packageInfo
|
|
||||||
settings := (*s)[category]
|
|
||||||
for _, pkg := range pkgs {
|
for _, pkg := range pkgs {
|
||||||
if _, found := settings[pkg.Plugin]; found {
|
key := category + "." + pkg.Plugin
|
||||||
categoryEnabledPackages = append(categoryEnabledPackages, pkg)
|
instances, found := s.plugins[key]
|
||||||
|
if !found {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
|
||||||
enabled.packages[category] = categoryEnabledPackages
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure we update the list of default parsers and serializers used by
|
// The package was configured so add it to the enabled list
|
||||||
// the remaining packages
|
enabled.packages[category] = append(enabled.packages[category], pkg)
|
||||||
enabled.FillDefaultParsers()
|
|
||||||
enabled.FillDefaultSerializers()
|
|
||||||
|
|
||||||
// If the user did not configure any parser, we want to include
|
// Check if the instances configured a data-format and decide if it
|
||||||
// the default parsers if any to preserve a functional set of
|
// is a parser or serializer depending on the plugin type.
|
||||||
// plugins.
|
// If no data-format was configured, check the default settings in
|
||||||
if len(enabled.packages["parsers"]) == 0 && len(enabled.defaultParsers) > 0 {
|
// case this plugin supports a data-format setting but the user
|
||||||
var parsers []packageInfo
|
// didn't set it.
|
||||||
for _, pkg := range p.packages["parsers"] {
|
for _, instance := range instances {
|
||||||
for _, name := range enabled.defaultParsers {
|
parser := pkg.DefaultParser
|
||||||
if pkg.Plugin == name {
|
serializer := pkg.DefaultSerializer
|
||||||
parsers = append(parsers, pkg)
|
if instance.dataformat != "" {
|
||||||
break
|
switch category {
|
||||||
|
case "inputs":
|
||||||
|
parser = instance.dataformat
|
||||||
|
case "processors":
|
||||||
|
parser = instance.dataformat
|
||||||
|
// The execd processor requires both a parser and serializer
|
||||||
|
if pkg.Plugin == "execd" {
|
||||||
|
serializer = instance.dataformat
|
||||||
|
}
|
||||||
|
case "outputs":
|
||||||
|
serializer = instance.dataformat
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if parser != "" {
|
||||||
|
implicitlyConfigured["parsers."+parser] = true
|
||||||
|
}
|
||||||
|
if serializer != "" {
|
||||||
|
implicitlyConfigured["serializers."+serializer] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enabled.packages["parsers"] = parsers
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the user did not configure any serializer, we want to include
|
// Iterate over all plugins AGAIN to add the implicitly configured packages
|
||||||
// the default one if any to preserve a functional set of plugins.
|
// such as parsers and serializers
|
||||||
if len(enabled.packages["serializers"]) == 0 && len(enabled.defaultSerializers) > 0 {
|
for category, pkgs := range p.packages {
|
||||||
var serializers []packageInfo
|
for _, pkg := range pkgs {
|
||||||
for _, pkg := range p.packages["serializers"] {
|
key := category + "." + pkg.Plugin
|
||||||
for _, name := range enabled.defaultSerializers {
|
|
||||||
if pkg.Plugin == name {
|
// Skip the plugins that were explicitly configured as we already
|
||||||
serializers = append(serializers, pkg)
|
// added them above.
|
||||||
break
|
if _, found := s.plugins[key]; found {
|
||||||
}
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the package if it was implicitly configured e.g. by a
|
||||||
|
// 'data_format' setting or by a default value for the data-format
|
||||||
|
if _, implicit := implicitlyConfigured[key]; implicit {
|
||||||
|
enabled.packages[category] = append(enabled.packages[category], pkg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enabled.packages["serializers"] = serializers
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &enabled
|
return &enabled
|
||||||
|
|
@ -134,31 +156,32 @@ func (s *selection) extractPluginsFromConfig(buf []byte) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, found := (*s)[category]; !found {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for name, data := range categoryTbl.Fields {
|
for name, data := range categoryTbl.Fields {
|
||||||
(*s)[category][name] = true
|
key := category + "." + name
|
||||||
|
cfg := instance{
|
||||||
|
category: category,
|
||||||
|
name: name,
|
||||||
|
enabled: true,
|
||||||
|
}
|
||||||
|
|
||||||
// We need to check the data_format field to get all required parsers
|
// We need to check the data_format field to get all required
|
||||||
switch category {
|
// parsers and serializers
|
||||||
case "inputs", "processors":
|
pluginTables, ok := data.([]*ast.Table)
|
||||||
pluginTables, ok := data.([]*ast.Table)
|
if ok {
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, subsubtbl := range pluginTables {
|
for _, subsubtbl := range pluginTables {
|
||||||
|
var dataformat string
|
||||||
for field, fieldData := range subsubtbl.Fields {
|
for field, fieldData := range subsubtbl.Fields {
|
||||||
if field != "data_format" {
|
if field != "data_format" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
kv := fieldData.(*ast.KeyValue)
|
kv := fieldData.(*ast.KeyValue)
|
||||||
name := kv.Value.(*ast.String)
|
option := kv.Value.(*ast.String)
|
||||||
(*s)["parsers"][name.Value] = true
|
dataformat = option.Value
|
||||||
}
|
}
|
||||||
|
cfg.dataformat = dataformat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.plugins[key] = append(s.plugins[key], cfg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,12 @@ func main() {
|
||||||
log.Fatalln("No configuration specified!")
|
log.Fatalln("No configuration specified!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Collect all available plugins
|
||||||
|
packages := packageCollection{}
|
||||||
|
if err := packages.CollectAvailable(); err != nil {
|
||||||
|
log.Fatalf("Collecting plugins failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Import the plugin list from Telegraf configuration files
|
// Import the plugin list from Telegraf configuration files
|
||||||
log.Println("Importing configuration file(s)...")
|
log.Println("Importing configuration file(s)...")
|
||||||
cfg, nfiles, err := ImportConfigurations(configFiles, configDirs)
|
cfg, nfiles, err := ImportConfigurations(configFiles, configDirs)
|
||||||
|
|
@ -108,12 +114,6 @@ func main() {
|
||||||
log.Fatalln("No configuration files loaded!")
|
log.Fatalln("No configuration files loaded!")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect all available plugins
|
|
||||||
packages := packageCollection{}
|
|
||||||
if err := packages.CollectAvailable(); err != nil {
|
|
||||||
log.Fatalf("Collecting plugins failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process the plugin list with the given config. This will
|
// Process the plugin list with the given config. This will
|
||||||
// only keep the plugins that adhere to the filtering criteria.
|
// only keep the plugins that adhere to the filtering criteria.
|
||||||
enabled := cfg.Filter(packages)
|
enabled := cfg.Filter(packages)
|
||||||
|
|
|
||||||
|
|
@ -36,9 +36,7 @@ type packageInfo struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type packageCollection struct {
|
type packageCollection struct {
|
||||||
packages map[string][]packageInfo
|
packages map[string][]packageInfo
|
||||||
defaultParsers []string
|
|
||||||
defaultSerializers []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Define the package exceptions
|
// Define the package exceptions
|
||||||
|
|
@ -105,18 +103,28 @@ func (p *packageCollection) collectPackagesForCategory(category string) error {
|
||||||
// as well as serializers for the output package
|
// as well as serializers for the output package
|
||||||
var defaultParser, defaultSerializer string
|
var defaultParser, defaultSerializer string
|
||||||
switch category {
|
switch category {
|
||||||
case "inputs", "processors":
|
case "inputs":
|
||||||
var err error
|
dataformat, err := extractDefaultDataFormat(path)
|
||||||
defaultParser, err = extractDefaultParser(path)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Getting default parser for %s.%s failed: %v", category, name, err)
|
log.Printf("Getting default data-format for %s.%s failed: %v", category, name, err)
|
||||||
|
}
|
||||||
|
defaultParser = dataformat
|
||||||
|
case "processors":
|
||||||
|
dataformat, err := extractDefaultDataFormat(path)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Getting default data-format for %s.%s failed: %v", category, name, err)
|
||||||
|
}
|
||||||
|
defaultParser = dataformat
|
||||||
|
// The execd processor requires both a parser and serializer
|
||||||
|
if name == "execd" {
|
||||||
|
defaultSerializer = dataformat
|
||||||
}
|
}
|
||||||
case "outputs":
|
case "outputs":
|
||||||
var err error
|
dataformat, err := extractDefaultDataFormat(path)
|
||||||
defaultSerializer, err = extractDefaultSerializer(path)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Getting default serializer for %s.%s failed: %v", category, name, err)
|
log.Printf("Getting default data-format for %s.%s failed: %v", category, name, err)
|
||||||
}
|
}
|
||||||
|
defaultSerializer = dataformat
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, plugin := range registeredNames {
|
for _, plugin := range registeredNames {
|
||||||
|
|
@ -138,46 +146,6 @@ func (p *packageCollection) collectPackagesForCategory(category string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *packageCollection) FillDefaultParsers() {
|
|
||||||
// Make sure we ignore all empty-named parsers which indicate
|
|
||||||
// that there is no parser used by the plugin.
|
|
||||||
parsers := map[string]bool{"": true}
|
|
||||||
|
|
||||||
// Iterate over all plugins that may have parsers and collect
|
|
||||||
// the defaults
|
|
||||||
p.defaultParsers = make([]string, 0)
|
|
||||||
for _, category := range []string{"inputs", "processors"} {
|
|
||||||
for _, pkg := range p.packages[category] {
|
|
||||||
name := pkg.DefaultParser
|
|
||||||
if seen := parsers[name]; seen {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
p.defaultParsers = append(p.defaultParsers, name)
|
|
||||||
parsers[name] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *packageCollection) FillDefaultSerializers() {
|
|
||||||
// Make sure we ignore all empty-named parsers which indicate
|
|
||||||
// that there is no parser used by the plugin.
|
|
||||||
serializers := map[string]bool{"": true}
|
|
||||||
|
|
||||||
// Iterate over all plugins that may have parsers and collect
|
|
||||||
// the defaults
|
|
||||||
p.defaultSerializers = make([]string, 0)
|
|
||||||
for _, category := range []string{"outputs"} {
|
|
||||||
for _, pkg := range p.packages[category] {
|
|
||||||
name := pkg.DefaultSerializer
|
|
||||||
if seen := serializers[name]; seen {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
p.defaultSerializers = append(p.defaultSerializers, name)
|
|
||||||
serializers[name] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *packageCollection) CollectAvailable() error {
|
func (p *packageCollection) CollectAvailable() error {
|
||||||
p.packages = make(map[string][]packageInfo)
|
p.packages = make(map[string][]packageInfo)
|
||||||
|
|
||||||
|
|
@ -187,9 +155,6 @@ func (p *packageCollection) CollectAvailable() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.FillDefaultParsers()
|
|
||||||
p.FillDefaultSerializers()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -347,11 +312,11 @@ func extractRegisteredNames(pkg *ast.Package, pluginType string) []string {
|
||||||
return registeredNames
|
return registeredNames
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractDefaultParser(pluginDir string) (string, error) {
|
func extractDefaultDataFormat(pluginDir string) (string, error) {
|
||||||
re := regexp.MustCompile(`^\s*#?\s*data_format\s*=\s*"(.*)"\s*$`)
|
re := regexp.MustCompile(`^\s*#?\s*data_format\s*=\s*"(.*)"\s*$`)
|
||||||
|
|
||||||
// Exception for exec which uses JSON by default
|
// Exception for exec input which uses JSON by default
|
||||||
if filepath.Base(pluginDir) == "exec" {
|
if filepath.ToSlash(pluginDir) == "plugins/inputs/exec" {
|
||||||
return "json", nil
|
return "json", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -383,35 +348,3 @@ func extractDefaultParser(pluginDir string) (string, error) {
|
||||||
|
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractDefaultSerializer(pluginDir string) (string, error) {
|
|
||||||
re := regexp.MustCompile(`^\s*#?\s*data_format\s*=\s*"(.*)"\s*$`)
|
|
||||||
|
|
||||||
// Walk all config files in the package directory
|
|
||||||
elements, err := os.ReadDir(pluginDir)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, element := range elements {
|
|
||||||
path := filepath.Join(pluginDir, element.Name())
|
|
||||||
if element.IsDir() || filepath.Ext(element.Name()) != ".conf" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the config and search for a "data_format" entry
|
|
||||||
file, err := os.Open(path)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
scanner := bufio.NewScanner(file)
|
|
||||||
for scanner.Scan() {
|
|
||||||
match := re.FindStringSubmatch(scanner.Text())
|
|
||||||
if len(match) == 2 {
|
|
||||||
return match[1], nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue