fix(tools.custom_builder): Handle multiple instance of the same plugin correctly (#15630)
This commit is contained in:
parent
ee2b806d68
commit
a3a8a8c465
|
|
@ -18,7 +18,7 @@ type instance struct {
|
||||||
category string
|
category string
|
||||||
name string
|
name string
|
||||||
enabled bool
|
enabled bool
|
||||||
dataformat string
|
dataformat []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type selection struct {
|
type selection struct {
|
||||||
|
|
@ -81,27 +81,27 @@ func (s *selection) Filter(p packageCollection) (*packageCollection, error) {
|
||||||
// case this plugin supports a data-format setting but the user
|
// case this plugin supports a data-format setting but the user
|
||||||
// didn't set it.
|
// didn't set it.
|
||||||
for _, instance := range instances {
|
for _, instance := range instances {
|
||||||
parser := pkg.DefaultParser
|
for _, dataformat := range instance.dataformat {
|
||||||
serializer := pkg.DefaultSerializer
|
|
||||||
if instance.dataformat != "" {
|
|
||||||
switch category {
|
switch category {
|
||||||
case "inputs":
|
case "inputs":
|
||||||
parser = instance.dataformat
|
implicitlyConfigured["parsers."+dataformat] = true
|
||||||
case "processors":
|
case "processors":
|
||||||
parser = instance.dataformat
|
implicitlyConfigured["parsers."+dataformat] = true
|
||||||
// The execd processor requires both a parser and serializer
|
// The execd processor requires both a parser and serializer
|
||||||
if pkg.Plugin == "execd" {
|
if pkg.Plugin == "execd" {
|
||||||
serializer = instance.dataformat
|
implicitlyConfigured["serializers."+dataformat] = true
|
||||||
}
|
}
|
||||||
case "outputs":
|
case "outputs":
|
||||||
serializer = instance.dataformat
|
implicitlyConfigured["serializers."+dataformat] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if parser != "" {
|
if len(instance.dataformat) == 0 {
|
||||||
implicitlyConfigured["parsers."+parser] = true
|
if pkg.DefaultParser != "" {
|
||||||
|
implicitlyConfigured["parsers."+pkg.DefaultParser] = true
|
||||||
|
}
|
||||||
|
if pkg.DefaultSerializer != "" {
|
||||||
|
implicitlyConfigured["serializers."+pkg.DefaultSerializer] = true
|
||||||
}
|
}
|
||||||
if serializer != "" {
|
|
||||||
implicitlyConfigured["serializers."+serializer] = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -215,7 +215,9 @@ func (s *selection) extractPluginsFromConfig(buf []byte) error {
|
||||||
option := kv.Value.(*ast.String)
|
option := kv.Value.(*ast.String)
|
||||||
dataformat = option.Value
|
dataformat = option.Value
|
||||||
}
|
}
|
||||||
cfg.dataformat = dataformat
|
if dataformat != "" {
|
||||||
|
cfg.dataformat = append(cfg.dataformat, dataformat)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.plugins[key] = append(s.plugins[key], cfg)
|
s.plugins[key] = append(s.plugins[key], cfg)
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
|
@ -62,83 +63,57 @@ func usage() {
|
||||||
fmt.Fprintln(flag.CommandLine.Output(), "")
|
fmt.Fprintln(flag.CommandLine.Output(), "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
type cmdConfig struct {
|
||||||
var dryrun, showtags, migrations, quiet bool
|
dryrun bool
|
||||||
var configFiles, configDirs []string
|
showtags bool
|
||||||
|
migrations bool
|
||||||
|
quiet bool
|
||||||
|
root string
|
||||||
|
configFiles []string
|
||||||
|
configDirs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var cfg cmdConfig
|
||||||
flag.Func("config",
|
flag.Func("config",
|
||||||
"Import plugins from configuration file (can be used multiple times)",
|
"Import plugins from configuration file (can be used multiple times)",
|
||||||
func(s string) error {
|
func(s string) error {
|
||||||
configFiles = append(configFiles, s)
|
cfg.configFiles = append(cfg.configFiles, s)
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
flag.Func("config-dir",
|
flag.Func("config-dir",
|
||||||
"Import plugins from configs in the given directory (can be used multiple times)",
|
"Import plugins from configs in the given directory (can be used multiple times)",
|
||||||
func(s string) error {
|
func(s string) error {
|
||||||
configDirs = append(configDirs, s)
|
cfg.configDirs = append(cfg.configDirs, s)
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
flag.BoolVar(&dryrun, "dry-run", false, "Skip the actual building step")
|
flag.BoolVar(&cfg.dryrun, "dry-run", false, "Skip the actual building step")
|
||||||
flag.BoolVar(&quiet, "quiet", false, "Print fewer log messages")
|
flag.BoolVar(&cfg.quiet, "quiet", false, "Print fewer log messages")
|
||||||
flag.BoolVar(&migrations, "migrations", false, "Include configuration migrations")
|
flag.BoolVar(&cfg.migrations, "migrations", false, "Include configuration migrations")
|
||||||
flag.BoolVar(&showtags, "tags", false, "Show build-tags used")
|
flag.BoolVar(&cfg.showtags, "tags", false, "Show build-tags used")
|
||||||
|
|
||||||
flag.Usage = usage
|
flag.Usage = usage
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// Check configuration options
|
tagset, err := process(&cfg)
|
||||||
if len(configFiles) == 0 && len(configDirs) == 0 {
|
|
||||||
log.Fatalln("No configuration specified!")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect all available plugins
|
|
||||||
packages := packageCollection{}
|
|
||||||
if err := packages.CollectAvailable(); err != nil {
|
|
||||||
log.Fatalf("Collecting plugins failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Import the plugin list from Telegraf configuration files
|
|
||||||
log.Println("Importing configuration file(s)...")
|
|
||||||
cfg, nfiles, err := ImportConfigurations(configFiles, configDirs)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Importing configuration(s) failed: %v", err)
|
log.Fatalln(err)
|
||||||
}
|
}
|
||||||
if !quiet {
|
|
||||||
log.Printf("Found %d configuration files...", nfiles)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we do have a config
|
|
||||||
if nfiles == 0 {
|
|
||||||
log.Fatalln("No configuration files loaded!")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process the plugin list with the given config. This will
|
|
||||||
// only keep the plugins that adhere to the filtering criteria.
|
|
||||||
enabled, err := cfg.Filter(packages)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Filtering packages failed: %v", err)
|
|
||||||
}
|
|
||||||
if !quiet {
|
|
||||||
enabled.Print()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract the build-tags
|
|
||||||
tagset := enabled.ExtractTags()
|
|
||||||
if len(tagset) == 0 {
|
if len(tagset) == 0 {
|
||||||
log.Fatalln("Nothing selected!")
|
log.Fatalln("Nothing selected!")
|
||||||
}
|
}
|
||||||
tags := "custom,"
|
tags := "custom,"
|
||||||
if migrations {
|
if cfg.migrations {
|
||||||
tags += "migrations,"
|
tags += "migrations,"
|
||||||
}
|
}
|
||||||
tags += strings.Join(tagset, ",")
|
tags += strings.Join(tagset, ",")
|
||||||
if showtags {
|
if cfg.showtags {
|
||||||
fmt.Printf("Build tags: %s\n", tags)
|
fmt.Printf("Build tags: %s\n", tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !dryrun {
|
if !cfg.dryrun {
|
||||||
// Perform the build
|
// Perform the build
|
||||||
var out bytes.Buffer
|
var out bytes.Buffer
|
||||||
makeCmd := exec.Command("make", buildTargets...)
|
makeCmd := exec.Command("make", buildTargets...)
|
||||||
|
|
@ -146,17 +121,58 @@ func main() {
|
||||||
makeCmd.Stdout = &out
|
makeCmd.Stdout = &out
|
||||||
makeCmd.Stderr = &out
|
makeCmd.Stderr = &out
|
||||||
|
|
||||||
if !quiet {
|
if !cfg.quiet {
|
||||||
log.Println("Running build...")
|
log.Println("Running build...")
|
||||||
}
|
}
|
||||||
if err := makeCmd.Run(); err != nil {
|
if err := makeCmd.Run(); err != nil {
|
||||||
fmt.Println(out.String())
|
fmt.Println(out.String())
|
||||||
log.Fatalf("Running make failed: %v", err)
|
log.Fatalf("Running make failed: %v", err)
|
||||||
}
|
}
|
||||||
if !quiet {
|
if !cfg.quiet {
|
||||||
fmt.Println(out.String())
|
fmt.Println(out.String())
|
||||||
}
|
}
|
||||||
} else if !quiet {
|
} else if !cfg.quiet {
|
||||||
log.Println("DRY-RUN: Skipping build.")
|
log.Println("DRY-RUN: Skipping build.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func process(cmdcfg *cmdConfig) ([]string, error) {
|
||||||
|
// Check configuration options
|
||||||
|
if len(cmdcfg.configFiles) == 0 && len(cmdcfg.configDirs) == 0 {
|
||||||
|
return nil, errors.New("no configuration specified")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect all available plugins
|
||||||
|
packages := packageCollection{root: cmdcfg.root}
|
||||||
|
if err := packages.CollectAvailable(); err != nil {
|
||||||
|
return nil, fmt.Errorf("collecting plugins failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Import the plugin list from Telegraf configuration files
|
||||||
|
log.Println("Importing configuration file(s)...")
|
||||||
|
cfg, nfiles, err := ImportConfigurations(cmdcfg.configFiles, cmdcfg.configDirs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("importing configuration(s) failed: %w", err)
|
||||||
|
}
|
||||||
|
if !cmdcfg.quiet {
|
||||||
|
log.Printf("Found %d configuration files...", nfiles)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we do have a config
|
||||||
|
if nfiles == 0 {
|
||||||
|
return nil, errors.New("no configuration files loaded")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process the plugin list with the given config. This will
|
||||||
|
// only keep the plugins that adhere to the filtering criteria.
|
||||||
|
enabled, err := cfg.Filter(packages)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("filtering packages failed: %w", err)
|
||||||
|
}
|
||||||
|
if !cmdcfg.quiet {
|
||||||
|
enabled.Print()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the build-tags
|
||||||
|
return enabled.ExtractTags(), nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,56 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCases(t *testing.T) {
|
||||||
|
// Silence the output
|
||||||
|
log.SetOutput(io.Discard)
|
||||||
|
|
||||||
|
// Get all directories in testdata
|
||||||
|
folders, err := os.ReadDir("testcases")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for _, f := range folders {
|
||||||
|
// Only handle folders
|
||||||
|
if !f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
configFilename := filepath.Join("testcases", f.Name(), "telegraf.conf")
|
||||||
|
expecedFilename := filepath.Join("testcases", f.Name(), "expected.tags")
|
||||||
|
|
||||||
|
t.Run(f.Name(), func(t *testing.T) {
|
||||||
|
// Read the expected output
|
||||||
|
file, err := os.Open(expecedFilename)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
var expected []string
|
||||||
|
scanner := bufio.NewScanner(file)
|
||||||
|
for scanner.Scan() {
|
||||||
|
expected = append(expected, scanner.Text())
|
||||||
|
}
|
||||||
|
require.NoError(t, scanner.Err())
|
||||||
|
|
||||||
|
// Configure the command
|
||||||
|
cfg := &cmdConfig{
|
||||||
|
dryrun: true,
|
||||||
|
quiet: true,
|
||||||
|
configFiles: []string{configFilename},
|
||||||
|
root: "../..",
|
||||||
|
}
|
||||||
|
|
||||||
|
actual, err := process(cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -36,6 +36,7 @@ type packageInfo struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type packageCollection struct {
|
type packageCollection struct {
|
||||||
|
root string
|
||||||
packages map[string][]packageInfo
|
packages map[string][]packageInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -53,7 +54,7 @@ var exceptions = map[string][]packageInfo{
|
||||||
|
|
||||||
func (p *packageCollection) collectPackagesForCategory(category string) error {
|
func (p *packageCollection) collectPackagesForCategory(category string) error {
|
||||||
var entries []packageInfo
|
var entries []packageInfo
|
||||||
pluginDir := filepath.Join("plugins", category)
|
pluginDir := filepath.Join(p.root, "plugins", category)
|
||||||
|
|
||||||
// Add exceptional packages if any
|
// Add exceptional packages if any
|
||||||
if pkgs, found := exceptions[category]; found {
|
if pkgs, found := exceptions[category]; found {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
inputs.disk
|
||||||
|
inputs.mem
|
||||||
|
inputs.swap
|
||||||
|
inputs.system
|
||||||
|
outputs.datadog
|
||||||
|
|
@ -0,0 +1,65 @@
|
||||||
|
## Telegraf Configuration for ThinClients
|
||||||
|
## /etc/telegraf/telegraf.conf
|
||||||
|
|
||||||
|
[global_tags]
|
||||||
|
service_name = "thinclient"
|
||||||
|
env = "prod"
|
||||||
|
team = "planetexpress"
|
||||||
|
|
||||||
|
## Configuration for telegraf agent
|
||||||
|
[agent]
|
||||||
|
## Data input and output settings
|
||||||
|
interval = "10s"
|
||||||
|
round_interval = true
|
||||||
|
metric_batch_size = 1000
|
||||||
|
metric_buffer_limit = 10000
|
||||||
|
collection_jitter = "0s"
|
||||||
|
flush_interval = "10s"
|
||||||
|
flush_jitter = "5s"
|
||||||
|
|
||||||
|
## Logging configuration
|
||||||
|
debug = false
|
||||||
|
quiet = false
|
||||||
|
# emtpy string means log to stderr
|
||||||
|
logfile = ""
|
||||||
|
|
||||||
|
## host configuration
|
||||||
|
# if emtpty use os.hostname()
|
||||||
|
hostname = ""
|
||||||
|
|
||||||
|
omit_hostname = false
|
||||||
|
|
||||||
|
# Configuration for sending metrics to Datadog
|
||||||
|
[[outputs.datadog]]
|
||||||
|
## Datadog API key
|
||||||
|
apikey = "${datadog_secret}"
|
||||||
|
|
||||||
|
## Connection timeout.
|
||||||
|
timeout = "5s"
|
||||||
|
|
||||||
|
|
||||||
|
## Write URL override; useful for debugging.
|
||||||
|
url = "${datadog_url}"
|
||||||
|
|
||||||
|
## Metrics to log
|
||||||
|
|
||||||
|
[[inputs.system]]
|
||||||
|
name_prefix = "dg.systemengineering.thinclient."
|
||||||
|
# default configuration; getting uptime values.
|
||||||
|
|
||||||
|
[[inputs.mem]]
|
||||||
|
name_prefix = "dg.systemengineering.thinclient."
|
||||||
|
# no configuration
|
||||||
|
|
||||||
|
[[inputs.disk]]
|
||||||
|
name_prefix = "dg.systemengineering.thinclient."
|
||||||
|
## By default stats will be gathered for all mount points.
|
||||||
|
## Set mount_points will restrict the stats to only the specified mount points.
|
||||||
|
mount_points = ["/"]
|
||||||
|
|
||||||
|
[[inputs.swap]]
|
||||||
|
name_prefix = "dg.systemengineering.thinclient."
|
||||||
|
## Monitoring SWAP (zswap) usage
|
||||||
|
|
||||||
|
## Ignore mount points by filesystem type.
|
||||||
|
#ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
inputs.mqtt_consumer
|
||||||
|
outputs.influxdb_v2
|
||||||
|
parsers.json_v2
|
||||||
|
parsers.value
|
||||||
|
|
@ -0,0 +1,39 @@
|
||||||
|
[[inputs.mqtt_consumer]]
|
||||||
|
name_override = "qr_mqtt_message"
|
||||||
|
servers = ["tcp://mosquitto:1883"]
|
||||||
|
topics = [
|
||||||
|
"<REDACTED>"
|
||||||
|
]
|
||||||
|
|
||||||
|
qos = 2
|
||||||
|
persistent_session = false
|
||||||
|
client_id = "telegraf_qr_code"
|
||||||
|
|
||||||
|
data_format = "json_v2"
|
||||||
|
|
||||||
|
[[inputs.mqtt_consumer.json_v2]]
|
||||||
|
[[inputs.mqtt_consumer.json_v2.object]]
|
||||||
|
path = "message.data"
|
||||||
|
tags = ["data"]
|
||||||
|
|
||||||
|
[[inputs.mqtt_consumer]]
|
||||||
|
name_override = "raw_mqtt_message"
|
||||||
|
servers = ["tcp://mosquitto:1883"]
|
||||||
|
|
||||||
|
# Capture the content as a string since we do not know the format of it...
|
||||||
|
data_format = "value"
|
||||||
|
data_type = "string"
|
||||||
|
|
||||||
|
# Capture all topics and store the topic as a tag with name "topic"...
|
||||||
|
topics = ["#"]
|
||||||
|
topic_tag = "topic"
|
||||||
|
|
||||||
|
qos = 2
|
||||||
|
persistent_session = false
|
||||||
|
client_id = "telegraf_generic"
|
||||||
|
|
||||||
|
[[outputs.influxdb_v2]]
|
||||||
|
urls = ["http://influxdb:8086"]
|
||||||
|
token = "${INFLUX_TOKEN}"
|
||||||
|
organization = "test"
|
||||||
|
bucket = "test_bucket"
|
||||||
Loading…
Reference in New Issue