From a2125f0457c88c3633e8ee523d80a7b40263b558 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 20 Jun 2023 15:52:38 +0200 Subject: [PATCH] fix(custom_builder): Correctly handle serializers and parsers (#13446) --- plugins/processors/execd/README.md | 5 + plugins/processors/execd/sample.conf | 5 + tools/custom_builder/config.go | 135 ++++++++++++++++----------- tools/custom_builder/main.go | 12 +-- tools/custom_builder/packages.go | 109 +++++---------------- 5 files changed, 116 insertions(+), 150 deletions(-) diff --git a/plugins/processors/execd/README.md b/plugins/processors/execd/README.md index 63010abe2..0b0a0843f 100644 --- a/plugins/processors/execd/README.md +++ b/plugins/processors/execd/README.md @@ -47,6 +47,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Delay before the process is restarted after an unexpected termination # restart_delay = "10s" + + ## Serialization format for communicating with the executed program + ## Please note that the corresponding data-format must exist both in + ## parsers and serializers + # data_format = "influx" ``` ## Example diff --git a/plugins/processors/execd/sample.conf b/plugins/processors/execd/sample.conf index 419e6b67f..5c884a7b0 100644 --- a/plugins/processors/execd/sample.conf +++ b/plugins/processors/execd/sample.conf @@ -13,3 +13,8 @@ ## Delay before the process is restarted after an unexpected termination # restart_delay = "10s" + + ## Serialization format for communicating with the executed program + ## Please note that the corresponding data-format must exist both in + ## parsers and serializers + # data_format = "influx" diff --git a/tools/custom_builder/config.go b/tools/custom_builder/config.go index 62825032e..5985980ef 100644 --- a/tools/custom_builder/config.go +++ b/tools/custom_builder/config.go @@ -13,15 +13,20 @@ import ( "github.com/influxdata/telegraf/config" ) -type pluginState map[string]bool -type selection map[string]pluginState +type instance struct { + category string + name string + enabled bool + dataformat string +} + +type selection struct { + plugins map[string][]instance +} func ImportConfigurations(files, dirs []string) (*selection, int, error) { - sel := selection(make(map[string]pluginState)) - - // Initialize the categories - for _, category := range categories { - sel[category] = make(map[string]bool) + sel := &selection{ + plugins: make(map[string][]instance), } // Gather all configuration files @@ -44,12 +49,12 @@ func ImportConfigurations(files, dirs []string) (*selection, int, error) { } } if len(filenames) == 0 { - return &sel, 0, errors.New("no configuration files given or found") + return sel, 0, errors.New("no configuration files given or found") } // Do the actual import err := sel.importFiles(filenames) - return &sel, len(filenames), err + return sel, len(filenames), err } func (s *selection) Filter(p packageCollection) *packageCollection { @@ -57,51 +62,68 @@ func (s *selection) Filter(p packageCollection) *packageCollection { packages: map[string][]packageInfo{}, } + implicitlyConfigured := make(map[string]bool) for category, pkgs := range p.packages { - var categoryEnabledPackages []packageInfo - settings := (*s)[category] for _, pkg := range pkgs { - if _, found := settings[pkg.Plugin]; found { - categoryEnabledPackages = append(categoryEnabledPackages, pkg) + key := category + "." + pkg.Plugin + instances, found := s.plugins[key] + if !found { + continue } - } - enabled.packages[category] = categoryEnabledPackages - } - // Make sure we update the list of default parsers and serializers used by - // the remaining packages - enabled.FillDefaultParsers() - enabled.FillDefaultSerializers() + // The package was configured so add it to the enabled list + enabled.packages[category] = append(enabled.packages[category], pkg) - // If the user did not configure any parser, we want to include - // the default parsers if any to preserve a functional set of - // plugins. - if len(enabled.packages["parsers"]) == 0 && len(enabled.defaultParsers) > 0 { - var parsers []packageInfo - for _, pkg := range p.packages["parsers"] { - for _, name := range enabled.defaultParsers { - if pkg.Plugin == name { - parsers = append(parsers, pkg) - break + // Check if the instances configured a data-format and decide if it + // is a parser or serializer depending on the plugin type. + // If no data-format was configured, check the default settings in + // case this plugin supports a data-format setting but the user + // didn't set it. + for _, instance := range instances { + parser := pkg.DefaultParser + serializer := pkg.DefaultSerializer + if instance.dataformat != "" { + switch category { + case "inputs": + parser = instance.dataformat + case "processors": + parser = instance.dataformat + // The execd processor requires both a parser and serializer + if pkg.Plugin == "execd" { + serializer = instance.dataformat + } + case "outputs": + serializer = instance.dataformat + } + } + if parser != "" { + implicitlyConfigured["parsers."+parser] = true + } + if serializer != "" { + implicitlyConfigured["serializers."+serializer] = true } } } - enabled.packages["parsers"] = parsers } - // If the user did not configure any serializer, we want to include - // the default one if any to preserve a functional set of plugins. - if len(enabled.packages["serializers"]) == 0 && len(enabled.defaultSerializers) > 0 { - var serializers []packageInfo - for _, pkg := range p.packages["serializers"] { - for _, name := range enabled.defaultSerializers { - if pkg.Plugin == name { - serializers = append(serializers, pkg) - break - } + // Iterate over all plugins AGAIN to add the implicitly configured packages + // such as parsers and serializers + for category, pkgs := range p.packages { + for _, pkg := range pkgs { + key := category + "." + pkg.Plugin + + // Skip the plugins that were explicitly configured as we already + // added them above. + if _, found := s.plugins[key]; found { + continue + } + + // Add the package if it was implicitly configured e.g. by a + // 'data_format' setting or by a default value for the data-format + if _, implicit := implicitlyConfigured[key]; implicit { + enabled.packages[category] = append(enabled.packages[category], pkg) } } - enabled.packages["serializers"] = serializers } return &enabled @@ -134,31 +156,32 @@ func (s *selection) extractPluginsFromConfig(buf []byte) error { continue } - if _, found := (*s)[category]; !found { - continue - } - for name, data := range categoryTbl.Fields { - (*s)[category][name] = true + key := category + "." + name + cfg := instance{ + category: category, + name: name, + enabled: true, + } - // We need to check the data_format field to get all required parsers - switch category { - case "inputs", "processors": - pluginTables, ok := data.([]*ast.Table) - if !ok { - continue - } + // We need to check the data_format field to get all required + // parsers and serializers + pluginTables, ok := data.([]*ast.Table) + if ok { for _, subsubtbl := range pluginTables { + var dataformat string for field, fieldData := range subsubtbl.Fields { if field != "data_format" { continue } kv := fieldData.(*ast.KeyValue) - name := kv.Value.(*ast.String) - (*s)["parsers"][name.Value] = true + option := kv.Value.(*ast.String) + dataformat = option.Value } + cfg.dataformat = dataformat } } + s.plugins[key] = append(s.plugins[key], cfg) } } diff --git a/tools/custom_builder/main.go b/tools/custom_builder/main.go index 1892b9461..3b0327724 100644 --- a/tools/custom_builder/main.go +++ b/tools/custom_builder/main.go @@ -93,6 +93,12 @@ func main() { log.Fatalln("No configuration specified!") } + // Collect all available plugins + packages := packageCollection{} + if err := packages.CollectAvailable(); err != nil { + log.Fatalf("Collecting plugins failed: %v", err) + } + // Import the plugin list from Telegraf configuration files log.Println("Importing configuration file(s)...") cfg, nfiles, err := ImportConfigurations(configFiles, configDirs) @@ -108,12 +114,6 @@ func main() { log.Fatalln("No configuration files loaded!") } - // Collect all available plugins - packages := packageCollection{} - if err := packages.CollectAvailable(); err != nil { - log.Fatalf("Collecting plugins failed: %v", err) - } - // Process the plugin list with the given config. This will // only keep the plugins that adhere to the filtering criteria. enabled := cfg.Filter(packages) diff --git a/tools/custom_builder/packages.go b/tools/custom_builder/packages.go index 5a952915c..fb058f2a2 100644 --- a/tools/custom_builder/packages.go +++ b/tools/custom_builder/packages.go @@ -36,9 +36,7 @@ type packageInfo struct { } type packageCollection struct { - packages map[string][]packageInfo - defaultParsers []string - defaultSerializers []string + packages map[string][]packageInfo } // Define the package exceptions @@ -105,18 +103,28 @@ func (p *packageCollection) collectPackagesForCategory(category string) error { // as well as serializers for the output package var defaultParser, defaultSerializer string switch category { - case "inputs", "processors": - var err error - defaultParser, err = extractDefaultParser(path) + case "inputs": + dataformat, err := extractDefaultDataFormat(path) if err != nil { - log.Printf("Getting default parser for %s.%s failed: %v", category, name, err) + log.Printf("Getting default data-format for %s.%s failed: %v", category, name, err) + } + defaultParser = dataformat + case "processors": + dataformat, err := extractDefaultDataFormat(path) + if err != nil { + log.Printf("Getting default data-format for %s.%s failed: %v", category, name, err) + } + defaultParser = dataformat + // The execd processor requires both a parser and serializer + if name == "execd" { + defaultSerializer = dataformat } case "outputs": - var err error - defaultSerializer, err = extractDefaultSerializer(path) + dataformat, err := extractDefaultDataFormat(path) if err != nil { - log.Printf("Getting default serializer for %s.%s failed: %v", category, name, err) + log.Printf("Getting default data-format for %s.%s failed: %v", category, name, err) } + defaultSerializer = dataformat } for _, plugin := range registeredNames { @@ -138,46 +146,6 @@ func (p *packageCollection) collectPackagesForCategory(category string) error { return nil } -func (p *packageCollection) FillDefaultParsers() { - // Make sure we ignore all empty-named parsers which indicate - // that there is no parser used by the plugin. - parsers := map[string]bool{"": true} - - // Iterate over all plugins that may have parsers and collect - // the defaults - p.defaultParsers = make([]string, 0) - for _, category := range []string{"inputs", "processors"} { - for _, pkg := range p.packages[category] { - name := pkg.DefaultParser - if seen := parsers[name]; seen { - continue - } - p.defaultParsers = append(p.defaultParsers, name) - parsers[name] = true - } - } -} - -func (p *packageCollection) FillDefaultSerializers() { - // Make sure we ignore all empty-named parsers which indicate - // that there is no parser used by the plugin. - serializers := map[string]bool{"": true} - - // Iterate over all plugins that may have parsers and collect - // the defaults - p.defaultSerializers = make([]string, 0) - for _, category := range []string{"outputs"} { - for _, pkg := range p.packages[category] { - name := pkg.DefaultSerializer - if seen := serializers[name]; seen { - continue - } - p.defaultSerializers = append(p.defaultSerializers, name) - serializers[name] = true - } - } -} - func (p *packageCollection) CollectAvailable() error { p.packages = make(map[string][]packageInfo) @@ -187,9 +155,6 @@ func (p *packageCollection) CollectAvailable() error { } } - p.FillDefaultParsers() - p.FillDefaultSerializers() - return nil } @@ -347,11 +312,11 @@ func extractRegisteredNames(pkg *ast.Package, pluginType string) []string { return registeredNames } -func extractDefaultParser(pluginDir string) (string, error) { +func extractDefaultDataFormat(pluginDir string) (string, error) { re := regexp.MustCompile(`^\s*#?\s*data_format\s*=\s*"(.*)"\s*$`) - // Exception for exec which uses JSON by default - if filepath.Base(pluginDir) == "exec" { + // Exception for exec input which uses JSON by default + if filepath.ToSlash(pluginDir) == "plugins/inputs/exec" { return "json", nil } @@ -383,35 +348,3 @@ func extractDefaultParser(pluginDir string) (string, error) { return "", nil } - -func extractDefaultSerializer(pluginDir string) (string, error) { - re := regexp.MustCompile(`^\s*#?\s*data_format\s*=\s*"(.*)"\s*$`) - - // Walk all config files in the package directory - elements, err := os.ReadDir(pluginDir) - if err != nil { - return "", err - } - - for _, element := range elements { - path := filepath.Join(pluginDir, element.Name()) - if element.IsDir() || filepath.Ext(element.Name()) != ".conf" { - continue - } - - // Read the config and search for a "data_format" entry - file, err := os.Open(path) - if err != nil { - return "", err - } - scanner := bufio.NewScanner(file) - for scanner.Scan() { - match := re.FindStringSubmatch(scanner.Text()) - if len(match) == 2 { - return match[1], nil - } - } - } - - return "", nil -}