diff --git a/cmd/telegraf/cmd_config.go b/cmd/telegraf/cmd_config.go new file mode 100644 index 000000000..49da0cc34 --- /dev/null +++ b/cmd/telegraf/cmd_config.go @@ -0,0 +1,174 @@ +// Command handling for configuration "config" command +package main + +import ( + "errors" + "fmt" + "io" + "log" + "net/url" + "os" + "path/filepath" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/logger" + "github.com/influxdata/telegraf/migrations" + "github.com/urfave/cli/v2" +) + +func getConfigCommands(pluginFilterFlags []cli.Flag, outputBuffer io.Writer) []*cli.Command { + return []*cli.Command{ + { + Name: "config", + Usage: "commands for generating and migrating configurations", + Flags: pluginFilterFlags, + Action: func(cCtx *cli.Context) error { + // The sub_Filters are populated when the filter flags are set after the subcommand config + // e.g. telegraf config --section-filter inputs + filters := processFilterFlags(cCtx) + + printSampleConfig(outputBuffer, filters) + return nil + }, + Subcommands: []*cli.Command{ + { + Name: "create", + Usage: "create a full sample configuration and show it", + Description: ` +The 'create' produces a full configuration containing all plugins as an example +and shows it on the console. You may apply 'section' or 'plugin' filtering +to reduce the output to the plugins you need + +Create the full configuration + +> telegraf config create + +To produce a configuration only containing a Modbus input plugin and an +InfluxDB v2 output plugin use + +> telegraf config create --section-filter "inputs:outputs" --input-filter "modbus" --output-filter "influxdb_v2" +`, + Flags: pluginFilterFlags, + Action: func(cCtx *cli.Context) error { + filters := processFilterFlags(cCtx) + + printSampleConfig(outputBuffer, filters) + return nil + }, + }, + { + Name: "migrate", + Usage: "migrate deprecated plugins and options of the configuration(s)", + Description: ` +The 'migrate' command reads the configuration files specified via '--config' or +'--config-directory' and tries to migrate plugins or options that are currently +deprecated using the recommended replacements. If no configuration file is +explicitly specified the command reads the default locations and uses those +configuration files. Migrated files are stored with a '.migrated' suffix at the +location of the inputs. If you are migrating remote configurations the migrated +configurations is stored in the current directory using the filename of the URL +with a '.migrated' suffix. +It is highly recommended to test those migrated configurations before using +those files unattended! + +To migrate the file 'mysettings.conf' use + +> telegraf --config mysettings.conf config migrate +`, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "force", + Usage: "forces overwriting of an existing migration file", + }, + }, + Action: func(cCtx *cli.Context) error { + // Setup logging + telegraf.Debug = cCtx.Bool("debug") + logConfig := logger.LogConfig{Debug: telegraf.Debug} + if err := logger.SetupLogging(logConfig); err != nil { + return err + } + + // Check if we have migrations at all. There might be + // none if you run a custom build without migrations + // enabled. + if len(migrations.PluginMigrations) == 0 { + return errors.New("no migrations available") + } + log.Printf("%d plugin migration(s) available", len(migrations.PluginMigrations)) + + // Collect the given configuration files + configFiles := cCtx.StringSlice("config") + configDir := cCtx.StringSlice("config-directory") + for _, fConfigDirectory := range configDir { + files, err := config.WalkDirectory(fConfigDirectory) + if err != nil { + return err + } + configFiles = append(configFiles, files...) + } + + // If no "config" or "config-directory" flag(s) was + // provided we should load default configuration files + if len(configFiles) == 0 { + paths, err := config.GetDefaultConfigPath() + if err != nil { + return err + } + configFiles = paths + } + + for _, fn := range configFiles { + log.Printf("D! Trying to migrate %q...", fn) + + // Read and parse the config file + data, remote, err := config.LoadConfigFile(fn) + if err != nil { + return fmt.Errorf("opening input %q failed: %w", fn, err) + } + + out, applied, err := config.ApplyMigrations(data) + if err != nil { + return err + } + + // Do not write a migration file if nothing was done + if applied == 0 { + log.Printf("I! No migration applied for %q", fn) + continue + } + + // Construct the output filename + // For remote locations we just save the filename + // with the migrated suffix. + outfn := fn + ".migrated" + if remote { + u, err := url.Parse(fn) + if err != nil { + return fmt.Errorf("parsing remote config URL %q failed: %w", fn, err) + } + outfn = filepath.Base(u.Path) + ".migrated" + } + + log.Printf("I! %d migration applied for %q, writing result as %q", applied, fn, outfn) + + // Make sure the file does not exist yet if we should not overwrite + if !cCtx.Bool("force") { + if _, err := os.Stat(outfn); !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("output file %q already exists", outfn) + } + } + + // Write the output file + if err := os.WriteFile(outfn, out, 0640); err != nil { + return fmt.Errorf("writing output %q failed: %w", outfn, err) + } + } + return nil + }, + }, + }, + }, + } +} diff --git a/cmd/telegraf/cmd_secretstore.go b/cmd/telegraf/cmd_secretstore.go index a1b73e3a0..1eb906860 100644 --- a/cmd/telegraf/cmd_secretstore.go +++ b/cmd/telegraf/cmd_secretstore.go @@ -1,4 +1,4 @@ -// Command handling for secret-stores' "secret" command +// Command handling for secret-stores' "secrets" command package main import ( diff --git a/cmd/telegraf/main.go b/cmd/telegraf/main.go index 6fed209d5..f99acd9bf 100644 --- a/cmd/telegraf/main.go +++ b/cmd/telegraf/main.go @@ -245,6 +245,11 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi return m.Run() } + commands := append( + getConfigCommands(pluginFilterFlags, outputBuffer), + getSecretStoreCommands(m)..., + ) + app := &cli.App{ Name: "Telegraf", Usage: "The plugin-driven server agent for collecting & reporting metrics.", @@ -341,19 +346,6 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi }, extraFlags...), Action: action, Commands: append([]*cli.Command{ - { - Name: "config", - Usage: "print out full sample configuration to stdout", - Flags: pluginFilterFlags, - Action: func(cCtx *cli.Context) error { - // The sub_Filters are populated when the filter flags are set after the subcommand config - // e.g. telegraf config --section-filter inputs - filters := processFilterFlags(cCtx) - - printSampleConfig(outputBuffer, filters) - return nil - }, - }, { Name: "version", Usage: "print current version to stdout", @@ -362,9 +354,7 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi return nil }, }, - }, - getSecretStoreCommands(m)..., - ), + }, commands...), } // Make sure we safely erase secrets diff --git a/config/config.go b/config/config.go index 04c187e19..71a271050 100644 --- a/config/config.go +++ b/config/config.go @@ -376,7 +376,7 @@ func WalkDirectory(path string) ([]string, error) { // 1. $TELEGRAF_CONFIG_PATH // 2. $HOME/.telegraf/telegraf.conf // 3. /etc/telegraf/telegraf.conf and /etc/telegraf/telegraf.d/*.conf -func getDefaultConfigPath() ([]string, error) { +func GetDefaultConfigPath() ([]string, error) { envfile := os.Getenv("TELEGRAF_CONFIG_PATH") homefile := os.ExpandEnv("${HOME}/.telegraf/telegraf.conf") etcfile := "/etc/telegraf/telegraf.conf" @@ -434,7 +434,7 @@ func (c *Config) LoadConfig(path string) error { paths := []string{} if path == "" { - if paths, err = getDefaultConfigPath(); err != nil { + if paths, err = GetDefaultConfigPath(); err != nil { return err } } else { @@ -446,7 +446,7 @@ func (c *Config) LoadConfig(path string) error { log.Printf("I! Loading config: %s", path) } - data, err := LoadConfigFile(path) + data, _, err := LoadConfigFile(path) if err != nil { return fmt.Errorf("error loading config file %s: %w", path, err) } @@ -696,33 +696,37 @@ func trimBOM(f []byte) []byte { return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf")) } -func LoadConfigFile(config string) ([]byte, error) { +// LoadConfigFile loads the content of a configuration file and returns it +// together with a flag denoting if the file is from a remote location such +// as a web server. +func LoadConfigFile(config string) ([]byte, bool, error) { if fetchURLRe.MatchString(config) { u, err := url.Parse(config) if err != nil { - return nil, err + return nil, true, err } switch u.Scheme { case "https", "http": - return fetchConfig(u) + data, err := fetchConfig(u) + return data, true, err default: - return nil, fmt.Errorf("scheme %q not supported", u.Scheme) + return nil, true, fmt.Errorf("scheme %q not supported", u.Scheme) } } // If it isn't a https scheme, try it as a file buffer, err := os.ReadFile(config) if err != nil { - return nil, err + return nil, false, err } mimeType := http.DetectContentType(buffer) if !strings.Contains(mimeType, "text/plain") { - return nil, fmt.Errorf("provided config is not a TOML file: %s", config) + return nil, false, fmt.Errorf("provided config is not a TOML file: %s", config) } - return buffer, nil + return buffer, false, nil } func fetchConfig(u *url.URL) ([]byte, error) { diff --git a/config/config_test.go b/config/config_test.go index 6937ee502..0d989db20 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -3,6 +3,8 @@ package config_test import ( "bytes" "fmt" + "net/http" + "net/http/httptest" "os" "os/exec" "path/filepath" @@ -441,6 +443,21 @@ func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) { } } +func TestGetDefaultConfigPathFromEnvURL(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + c := config.NewConfig() + t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL) + configPath, err := config.GetDefaultConfigPath() + require.NoError(t, err) + require.Equal(t, []string{ts.URL}, configPath) + err = c.LoadConfig("") + require.NoError(t, err) +} + func TestConfig_URLLikeFileName(t *testing.T) { c := config.NewConfig() err := c.LoadConfig("http:##www.example.com.conf") diff --git a/config/internal_test.go b/config/internal_test.go index b6e943848..2d04926f1 100644 --- a/config/internal_test.go +++ b/config/internal_test.go @@ -135,18 +135,3 @@ func TestURLRetries3FailsThenPasses(t *testing.T) { require.NoError(t, c.LoadConfig(ts.URL)) require.Equal(t, 4, responseCounter) } - -func TestConfig_getDefaultConfigPathFromEnvURL(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - })) - defer ts.Close() - - c := NewConfig() - t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL) - configPath, err := getDefaultConfigPath() - require.NoError(t, err) - require.Equal(t, []string{ts.URL}, configPath) - err = c.LoadConfig("") - require.NoError(t, err) -} diff --git a/config/migration.go b/config/migration.go new file mode 100644 index 000000000..fe0be54eb --- /dev/null +++ b/config/migration.go @@ -0,0 +1,181 @@ +package config + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "log" + "sort" + "strings" + + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + + "github.com/influxdata/telegraf/migrations" + _ "github.com/influxdata/telegraf/migrations/all" // register all migrations +) + +type section struct { + name string + begin int + content *ast.Table + raw *bytes.Buffer +} + +func splitToSections(root *ast.Table) ([]section, error) { + var sections []section + for name, elements := range root.Fields { + switch name { + case "inputs", "outputs", "processors", "aggregators": + category, ok := elements.(*ast.Table) + if !ok { + return nil, fmt.Errorf("%q is not a table (%T)", name, category) + } + + for plugin, elements := range category.Fields { + tbls, ok := elements.([]*ast.Table) + if !ok { + return nil, fmt.Errorf("elements of \"%s.%s\" is not a list of tables (%T)", name, plugin, elements) + } + for _, tbl := range tbls { + s := section{ + name: name + "." + tbl.Name, + begin: tbl.Line, + content: tbl, + raw: &bytes.Buffer{}, + } + sections = append(sections, s) + } + } + default: + tbl, ok := elements.(*ast.Table) + if !ok { + return nil, fmt.Errorf("%q is not a table (%T)", name, elements) + } + s := section{ + name: name, + begin: tbl.Line, + content: tbl, + raw: &bytes.Buffer{}, + } + sections = append(sections, s) + } + } + + // Sort the TOML elements by begin (line-number) + sort.SliceStable(sections, func(i, j int) bool { return sections[i].begin < sections[j].begin }) + + return sections, nil +} + +func assignTextToSections(data []byte, sections []section) ([]section, error) { + // Now assign the raw text to each section + if sections[0].begin > 0 { + sections = append([]section{{ + name: "header", + begin: 0, + raw: &bytes.Buffer{}, + }}, sections...) + } + + var lineno int + scanner := bufio.NewScanner(bytes.NewBuffer(data)) + for idx, next := range sections[1:] { + var buf bytes.Buffer + for lineno < next.begin-1 { + if !scanner.Scan() { + break + } + lineno++ + + line := strings.TrimSpace(scanner.Text()) + if strings.HasPrefix(line, "#") { + _, _ = buf.Write(scanner.Bytes()) + _, _ = buf.WriteString("\n") + continue + } else if buf.Len() > 0 { + if _, err := io.Copy(sections[idx].raw, &buf); err != nil { + return nil, fmt.Errorf("copying buffer failed: %w", err) + } + buf.Reset() + } + + _, _ = sections[idx].raw.Write(scanner.Bytes()) + _, _ = sections[idx].raw.WriteString("\n") + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("splitting by line failed: %w", err) + } + + // If a comment is directly in front of the next section, without + // newline, the comment is assigned to the next section. + if buf.Len() > 0 { + if _, err := io.Copy(sections[idx+1].raw, &buf); err != nil { + return nil, fmt.Errorf("copying buffer failed: %w", err) + } + buf.Reset() + } + } + // Write the remaining to the last section + for scanner.Scan() { + _, _ = sections[len(sections)-1].raw.Write(scanner.Bytes()) + _, _ = sections[len(sections)-1].raw.WriteString("\n") + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("splitting by line failed: %w", err) + } + return sections, nil +} + +func ApplyMigrations(data []byte) ([]byte, uint64, error) { + root, err := toml.Parse(data) + if err != nil { + return nil, 0, fmt.Errorf("parsing failed: %w", err) + } + + // Split the configuration into sections containing the location + // in the file. + sections, err := splitToSections(root) + if err != nil { + return nil, 0, fmt.Errorf("splitting to sections failed: %w", err) + } + if len(sections) == 0 { + return nil, 0, errors.New("no TOML configuration found") + } + + // Assign the configuration text to the corresponding segments + sections, err = assignTextToSections(data, sections) + if err != nil { + return nil, 0, fmt.Errorf("assigning text failed: %w", err) + } + + // Do the actual migration(s) + var applied uint64 + for idx, s := range sections { + migrate, found := migrations.PluginMigrations[s.name] + if !found { + continue + } + + log.Printf("D! migrating plugin %q in line %d...", s.name, s.begin) + result, err := migrate(s.content) + if err != nil { + return nil, 0, fmt.Errorf("migrating %q (line %d) failed: %w", s.name, s.begin, err) + } + s.raw = bytes.NewBuffer(result) + sections[idx] = s + applied++ + } + + var buf bytes.Buffer + for _, s := range sections { + _, err = s.raw.WriteTo(&buf) + if err != nil { + return nil, applied, fmt.Errorf("joining output failed: %w", err) + } + } + + return buf.Bytes(), applied, nil +} diff --git a/migrations/all/all.go b/migrations/all/all.go new file mode 100644 index 000000000..1a6c64721 --- /dev/null +++ b/migrations/all/all.go @@ -0,0 +1 @@ +package all diff --git a/migrations/all/inputs_cassandra.go b/migrations/all/inputs_cassandra.go new file mode 100644 index 000000000..334b6a9bb --- /dev/null +++ b/migrations/all/inputs_cassandra.go @@ -0,0 +1,5 @@ +//go:build !custom || (migrations && (inputs || inputs.cassandra)) + +package all + +import _ "github.com/influxdata/telegraf/migrations/inputs_cassandra" // register migration diff --git a/migrations/common/filter_options.go b/migrations/common/filter_options.go new file mode 100644 index 000000000..7da9e8cc3 --- /dev/null +++ b/migrations/common/filter_options.go @@ -0,0 +1,15 @@ +package common + +type FilterOptions struct { + NamePass []string `toml:"namepass"` + NameDrop []string `toml:"namedrop"` + FieldPassOld []string `toml:"pass"` + FieldPass []string `toml:"fieldpass"` + FieldDropOld []string `toml:"drop"` + FieldDrop []string `toml:"fielddrop"` + TagPassFilters map[string][]string `toml:"tagpass"` + TagDropFilters map[string][]string `toml:"tagdrop"` + TagExclude []string `toml:"tagexclude"` + TagInclude []string `toml:"taginclude"` + MetricPass string `toml:"metricpass"` +} diff --git a/migrations/common/input_options.go b/migrations/common/input_options.go new file mode 100644 index 000000000..4ab414021 --- /dev/null +++ b/migrations/common/input_options.go @@ -0,0 +1,14 @@ +package common + +type InputOptions struct { + Interval string `toml:"interval"` + Precision string `toml:"precision"` + CollectionJitter string `toml:"collection_jitter"` + CollectionOffset string `toml:"collection_offset"` + NamePrefix string `toml:"name_prefix"` + NameSuffix string `toml:"name_suffix"` + NameOverride string `toml:"name_override"` + Alias string `toml:"alias"` + Tags map[string]string `toml:"tags"` + FilterOptions +} diff --git a/migrations/inputs_cassandra/README.md b/migrations/inputs_cassandra/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/migrations/inputs_cassandra/migration.go b/migrations/inputs_cassandra/migration.go new file mode 100644 index 000000000..5dd962775 --- /dev/null +++ b/migrations/inputs_cassandra/migration.go @@ -0,0 +1,247 @@ +package inputs_cassandra + +import ( + "fmt" + "net/url" + "sort" + "strings" + + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + + "github.com/influxdata/telegraf/migrations" + "github.com/influxdata/telegraf/migrations/common" +) + +// Define "old" data structure +type cassandra struct { + Context string `toml:"context"` + Servers []string `toml:"servers"` + Metrics []string `toml:"metrics"` + common.InputOptions +} + +// Define "new" data structure(s) +type metricConfig struct { + Name string `toml:"name"` + Mbean string `toml:"mbean"` + FieldPrefix *string `toml:"field_prefix,omitempty"` + TagKeys []string `toml:"tag_keys,omitempty"` +} + +type jolokiaAgent struct { + URLs []string `toml:"urls"` + Username string `toml:"username,omitempty"` + Password string `toml:"password,omitempty"` + Metrics []metricConfig `toml:"metric"` + + // Common options + Interval string `toml:"interval,omitempty"` + Precision string `toml:"precision,omitempty"` + CollectionJitter string `toml:"collection_jitter,omitempty"` + CollectionOffset string `toml:"collection_offset,omitempty"` + NamePrefix string `toml:"name_prefix,omitempty"` + NameSuffix string `toml:"name_suffix,omitempty"` + NameOverride string `toml:"name_override,omitempty"` + Alias string `toml:"alias,omitempty"` + Tags map[string]string `toml:"tags,omitempty"` + + NamePass []string `toml:"namepass,omitempty"` + NameDrop []string `toml:"namedrop,omitempty"` + FieldPass []string `toml:"fieldpass,omitempty"` + FieldDrop []string `toml:"fielddrop,omitempty"` + TagPassFilters map[string][]string `toml:"tagpass,omitempty"` + TagDropFilters map[string][]string `toml:"tagdrop,omitempty"` + TagExclude []string `toml:"tagexclude,omitempty"` + TagInclude []string `toml:"taginclude,omitempty"` + MetricPass string `toml:"metricpass,omitempty"` +} + +// Migration function +func migrate(tbl *ast.Table) ([]byte, error) { + // Decode the old data structure + var old cassandra + if err := toml.UnmarshalTable(tbl, &old); err != nil { + return nil, err + } + + // Collect servers that use the same credentials + endpoints := make(map[string]jolokiaAgent) + for _, server := range old.Servers { + u, err := url.Parse("http://" + server) + if err != nil { + return nil, fmt.Errorf("invalid url %q: %w", server, err) + } + if u.Path != "" { + return nil, fmt.Errorf("unexpected path in %q: %w", server, err) + } + if u.Hostname() == "" { + u.Host = "localhost:" + u.Port() + } + user := u.User.Username() + passwd, _ := u.User.Password() + key := user + ":" + passwd + + endpoint, found := endpoints[key] + if !found { + endpoint = jolokiaAgent{ + Username: user, + Password: passwd, + } + endpoint.fillCommon(old.InputOptions) + } + u.User = nil + endpoint.URLs = append(endpoint.URLs, u.String()) + endpoints[key] = endpoint + } + + // Create new-style metrics according to the old config + var javaMetrics []metricConfig + var cassandraMetrics []metricConfig + for _, metric := range old.Metrics { + bean := strings.TrimPrefix(metric, "/") + + params := make(map[string]string) + parts := strings.SplitN(bean, ":", 2) + for _, p := range strings.Split(parts[1], ",") { + x := strings.SplitN(p, "=", 2) + params[x[0]] = x[1] + } + + name, found := params["type"] + if !found { + return nil, fmt.Errorf("cannot determine name for metric %q", metric) + } + name = strings.SplitN(name, "/", 2)[0] + + var tagKeys []string + var prefix *string + for k := range params { + switch k { + case "name", "scope", "path", "keyspace": + tagKeys = append(tagKeys, k) + } + } + sort.Strings(tagKeys) + for i, k := range tagKeys { + if k == "name" { + p := fmt.Sprintf("$%d_", i+1) + prefix = &p + break + } + } + + switch { + case strings.HasPrefix(bean, "java.lang:"): + javaMetrics = append(javaMetrics, metricConfig{ + Name: name, + Mbean: bean, + TagKeys: tagKeys, + FieldPrefix: prefix, + }) + case strings.HasPrefix(bean, "org.apache.cassandra.metrics:"): + cassandraMetrics = append(cassandraMetrics, metricConfig{ + Name: name, + Mbean: bean, + TagKeys: tagKeys, + FieldPrefix: prefix, + }) + default: + return nil, fmt.Errorf("unknown java metric %q", metric) + } + } + + // Create the corresponding metric configurations + cfg := migrations.CreateTOMLStruct("inputs", "jolokia2_agent") + for _, endpoint := range endpoints { + if len(javaMetrics) > 0 { + plugin := jolokiaAgent{ + URLs: endpoint.URLs, + Username: endpoint.Username, + Password: endpoint.Password, + Metrics: javaMetrics, + } + plugin.fillCommon(old.InputOptions) + plugin.NamePrefix = "java" + cfg.Add("inputs", "jolokia2_agent", plugin) + } + if len(cassandraMetrics) > 0 { + plugin := jolokiaAgent{ + URLs: endpoint.URLs, + Username: endpoint.Username, + Password: endpoint.Password, + Metrics: cassandraMetrics, + } + plugin.fillCommon(old.InputOptions) + plugin.NamePrefix = "cassandra" + + cfg.Add("inputs", "jolokia2_agent", plugin) + } + } + + // Marshal the new configuration + buf, err := toml.Marshal(cfg) + if err != nil { + return nil, err + } + buf = append(buf, []byte("\n")...) + + // Create the new content to output + return buf, nil +} + +func (j *jolokiaAgent) fillCommon(o common.InputOptions) { + j.Interval = o.Interval + j.Precision = o.Precision + j.CollectionJitter = o.CollectionJitter + j.CollectionOffset = o.CollectionOffset + j.NamePrefix = o.NamePrefix + j.NameSuffix = o.NameSuffix + j.NameOverride = o.NameOverride + j.Alias = o.Alias + if len(o.Tags) > 0 { + j.Tags = make(map[string]string, len(o.Tags)) + for k, v := range o.Tags { + j.Tags[k] = v + } + } + + if len(o.NamePass) > 0 { + j.NamePass = append(j.NamePass, o.NamePass...) + } + if len(o.NameDrop) > 0 { + j.NameDrop = append(j.NameDrop, o.NameDrop...) + } + if len(o.FieldPass) > 0 || len(o.FieldDropOld) > 0 { + j.FieldPass = append(j.FieldPass, o.FieldPass...) + j.FieldPass = append(j.FieldPass, o.FieldPassOld...) + } + if len(o.FieldDrop) > 0 || len(o.FieldDropOld) > 0 { + j.FieldDrop = append(j.FieldDrop, o.FieldDrop...) + j.FieldDrop = append(j.FieldDrop, o.FieldDropOld...) + } + if len(o.TagPassFilters) > 0 { + j.TagPassFilters = make(map[string][]string, len(o.TagPassFilters)) + for k, v := range o.TagPassFilters { + j.TagPassFilters[k] = v + } + } + if len(o.TagDropFilters) > 0 { + j.TagDropFilters = make(map[string][]string, len(o.TagDropFilters)) + for k, v := range o.TagDropFilters { + j.TagDropFilters[k] = v + } + } + if len(o.TagExclude) > 0 { + j.TagExclude = append(j.TagExclude, o.TagExclude...) + } + if len(o.TagInclude) > 0 { + j.TagInclude = append(j.TagInclude, o.TagInclude...) + } + j.MetricPass = o.MetricPass +} + +// Register the migration function for the plugin type +func init() { + migrations.AddPluginMigration("inputs.cassandra", migrate) +} diff --git a/migrations/inputs_cassandra/migration_test.go b/migrations/inputs_cassandra/migration_test.go new file mode 100644 index 000000000..a454376a3 --- /dev/null +++ b/migrations/inputs_cassandra/migration_test.go @@ -0,0 +1,61 @@ +package inputs_cassandra_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + _ "github.com/influxdata/telegraf/migrations/inputs_cassandra" // register migration + _ "github.com/influxdata/telegraf/plugins/inputs/jolokia2_agent" // register plugin +) + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + inputFile := filepath.Join(testcasePath, "telegraf.conf") + expectedFile := filepath.Join(testcasePath, "expected.conf") + + // Read the expected output + expected := config.NewConfig() + require.NoError(t, expected.LoadConfig(expectedFile)) + require.NotEmpty(t, expected.Inputs) + + // Read the input data + input, remote, err := config.LoadConfigFile(inputFile) + require.NoError(t, err) + require.False(t, remote) + require.NotEmpty(t, input) + + // Migrate + output, n, err := config.ApplyMigrations(input) + require.NoError(t, err) + require.NotEmpty(t, output) + require.GreaterOrEqual(t, n, uint64(1)) + actual := config.NewConfig() + require.NoError(t, actual.LoadConfigData(output)) + + // Test the output + require.Len(t, actual.Inputs, len(expected.Inputs)) + actualIDs := make([]string, 0, len(expected.Inputs)) + expectedIDs := make([]string, 0, len(expected.Inputs)) + for i := range actual.Inputs { + actualIDs = append(actualIDs, actual.Inputs[i].ID()) + expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) + } + require.ElementsMatch(t, expectedIDs, actualIDs) + }) + } +} diff --git a/migrations/inputs_cassandra/testcases/filter_options/expected.conf b/migrations/inputs_cassandra/testcases/filter_options/expected.conf new file mode 100644 index 000000000..83354092f --- /dev/null +++ b/migrations/inputs_cassandra/testcases/filter_options/expected.conf @@ -0,0 +1,55 @@ +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.1:8778"] +username = "myuser" +password = "mypassword" +name_prefix = "java" + +[[inputs.jolokia2_agent.metric]] +name = "Memory" +mbean = "java.lang:type=Memory/HeapMemoryUsage" + +[inputs.jolokia2_agent.tagdrop] +app = ["myapp"] +location = ["e*"] + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.1:8778"] +username = "myuser" +password = "mypassword" +name_prefix = "cassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Table" +mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" +field_prefix = "$2_" +tag_keys = ["keyspace", "name", "scope"] + +[inputs.jolokia2_agent.tagdrop] +app = ["myapp"] +location = ["e*"] + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.2:8778", "http://localhost:8778"] +name_prefix = "java" + +[[inputs.jolokia2_agent.metric]] +name = "Memory" +mbean = "java.lang:type=Memory/HeapMemoryUsage" + +[inputs.jolokia2_agent.tagdrop] +app = ["myapp"] +location = ["e*"] + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.2:8778", "http://localhost:8778"] +name_prefix = "cassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Table" +mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" +field_prefix = "$2_" +tag_keys = ["keyspace", "name", "scope"] + +[inputs.jolokia2_agent.tagdrop] +app = ["myapp"] +location = ["e*"] diff --git a/migrations/inputs_cassandra/testcases/filter_options/telegraf.conf b/migrations/inputs_cassandra/testcases/filter_options/telegraf.conf new file mode 100644 index 000000000..c2557231c --- /dev/null +++ b/migrations/inputs_cassandra/testcases/filter_options/telegraf.conf @@ -0,0 +1,11 @@ +[[inputs.cassandra]] + context = "/jolokia/read" + servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"] + metrics = [ + "/java.lang:type=Memory/HeapMemoryUsage", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" + ] + + [inputs.cassandra.tagdrop] + app = ["myapp"] + location = ["e*"] diff --git a/migrations/inputs_cassandra/testcases/general_options/expected.conf b/migrations/inputs_cassandra/testcases/general_options/expected.conf new file mode 100644 index 000000000..16e58c448 --- /dev/null +++ b/migrations/inputs_cassandra/testcases/general_options/expected.conf @@ -0,0 +1,64 @@ +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.1:8778"] +username = "myuser" +password = "mypassword" +interval = "20s" +name_prefix = "java" +alias = "mycassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Memory" +mbean = "java.lang:type=Memory/HeapMemoryUsage" + +[inputs.jolokia2_agent.tags] +app = "myapp" +location = "east" + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.1:8778"] +username = "myuser" +password = "mypassword" +interval = "20s" +name_prefix = "cassandra" +alias = "mycassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Table" +mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" +field_prefix = "$2_" +tag_keys = ["keyspace", "name", "scope"] + +[inputs.jolokia2_agent.tags] +app = "myapp" +location = "east" + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.2:8778", "http://localhost:8778"] +interval = "20s" +name_prefix = "java" +alias = "mycassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Memory" +mbean = "java.lang:type=Memory/HeapMemoryUsage" + +[inputs.jolokia2_agent.tags] +app = "myapp" +location = "east" + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.2:8778", "http://localhost:8778"] +interval = "20s" +name_prefix = "cassandra" +alias = "mycassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Table" +mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" +field_prefix = "$2_" +tag_keys = ["keyspace", "name", "scope"] + +[inputs.jolokia2_agent.tags] +app = "myapp" +location = "east" + diff --git a/migrations/inputs_cassandra/testcases/general_options/telegraf.conf b/migrations/inputs_cassandra/testcases/general_options/telegraf.conf new file mode 100644 index 000000000..bb3a2870b --- /dev/null +++ b/migrations/inputs_cassandra/testcases/general_options/telegraf.conf @@ -0,0 +1,13 @@ +[[inputs.cassandra]] + interval = "20s" + alias = "mycassandra" + context = "/jolokia/read" + servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"] + metrics = [ + "/java.lang:type=Memory/HeapMemoryUsage", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" + ] + + [inputs.cassandra.tags] + app = "myapp" + location = "east" diff --git a/migrations/inputs_cassandra/testcases/simple/expected.conf b/migrations/inputs_cassandra/testcases/simple/expected.conf new file mode 100644 index 000000000..428469094 --- /dev/null +++ b/migrations/inputs_cassandra/testcases/simple/expected.conf @@ -0,0 +1,40 @@ +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.1:8778"] +username = "myuser" +password = "mypassword" +name_prefix = "java" + +[[inputs.jolokia2_agent.metric]] +name = "Memory" +mbean = "java.lang:type=Memory/HeapMemoryUsage" + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.1:8778"] +username = "myuser" +password = "mypassword" +name_prefix = "cassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Table" +mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" +field_prefix = "$2_" +tag_keys = ["keyspace", "name", "scope"] + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.2:8778", "http://localhost:8778"] +name_prefix = "java" + +[[inputs.jolokia2_agent.metric]] +name = "Memory" +mbean = "java.lang:type=Memory/HeapMemoryUsage" + +[[inputs.jolokia2_agent]] +urls = ["http://10.10.10.2:8778", "http://localhost:8778"] +name_prefix = "cassandra" + +[[inputs.jolokia2_agent.metric]] +name = "Table" +mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" +field_prefix = "$2_" +tag_keys = ["keyspace", "name", "scope"] + diff --git a/migrations/inputs_cassandra/testcases/simple/telegraf.conf b/migrations/inputs_cassandra/testcases/simple/telegraf.conf new file mode 100644 index 000000000..242f7ea81 --- /dev/null +++ b/migrations/inputs_cassandra/testcases/simple/telegraf.conf @@ -0,0 +1,14 @@ +[[inputs.cassandra]] + context = "/jolokia/read" + ## List of cassandra servers exposing jolokia read service + servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"] + ## List of metrics collected on above servers + ## Each metric consists of a jmx path. + ## This will collect all heap memory usage metrics from the jvm and + ## ReadLatency metrics for all keyspaces and tables. + ## "type=Table" in the query works with Cassandra3.0. Older versions might + ## need to use "type=ColumnFamily" + metrics = [ + "/java.lang:type=Memory/HeapMemoryUsage", + "/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency" + ] diff --git a/migrations/registry.go b/migrations/registry.go new file mode 100644 index 000000000..b399b0fc5 --- /dev/null +++ b/migrations/registry.go @@ -0,0 +1,33 @@ +package migrations + +import ( + "fmt" + + "github.com/influxdata/toml/ast" +) + +type PluginMigrationFunc func(*ast.Table) ([]byte, error) + +var PluginMigrations = make(map[string]PluginMigrationFunc) + +func AddPluginMigration(name string, f PluginMigrationFunc) { + if _, found := PluginMigrations[name]; found { + panic(fmt.Errorf("plugin migration function already registered for %q", name)) + } + PluginMigrations[name] = f +} + +type pluginTOMLStruct map[string]map[string][]interface{} + +func CreateTOMLStruct(category, name string) pluginTOMLStruct { + return map[string]map[string][]interface{}{ + category: { + name: make([]interface{}, 0), + }, + } +} + +func (p *pluginTOMLStruct) Add(category, name string, plugin interface{}) { + cfg := map[string]map[string][]interface{}(*p) + cfg[category][name] = append(cfg[category][name], plugin) +} diff --git a/tools/custom_builder/config.go b/tools/custom_builder/config.go index 29748a044..62825032e 100644 --- a/tools/custom_builder/config.go +++ b/tools/custom_builder/config.go @@ -109,7 +109,7 @@ func (s *selection) Filter(p packageCollection) *packageCollection { func (s *selection) importFiles(configurations []string) error { for _, cfg := range configurations { - buf, err := config.LoadConfigFile(cfg) + buf, _, err := config.LoadConfigFile(cfg) if err != nil { return fmt.Errorf("reading %q failed: %w", cfg, err) } diff --git a/tools/custom_builder/main.go b/tools/custom_builder/main.go index 774d0d549..1892b9461 100644 --- a/tools/custom_builder/main.go +++ b/tools/custom_builder/main.go @@ -63,7 +63,7 @@ func usage() { } func main() { - var dryrun, showtags, quiet bool + var dryrun, showtags, migrations, quiet bool var configFiles, configDirs []string flag.Func("config", @@ -82,6 +82,7 @@ func main() { ) flag.BoolVar(&dryrun, "dry-run", false, "Skip the actual building step") flag.BoolVar(&quiet, "quiet", false, "Print fewer log messages") + flag.BoolVar(&migrations, "migrations", false, "Include configuration migrations") flag.BoolVar(&showtags, "tags", false, "Show build-tags used") flag.Usage = usage @@ -125,7 +126,11 @@ func main() { if len(tagset) == 0 { log.Fatalln("Nothing selected!") } - tags := "custom," + strings.Join(tagset, ",") + tags := "custom," + if migrations { + tags += "migrations," + } + tags += strings.Join(tagset, ",") if showtags { fmt.Printf("Build tags: %s\n", tags) }