feat(config): Add framework for migrating deprecated plugins (#13377)

This commit is contained in:
Sven Rebhan 2023-06-09 10:10:09 +02:00 committed by GitHub
parent ebe346103e
commit 16786d2977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 974 additions and 45 deletions

174
cmd/telegraf/cmd_config.go Normal file
View File

@ -0,0 +1,174 @@
// Command handling for configuration "config" command
package main
import (
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
"path/filepath"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/migrations"
"github.com/urfave/cli/v2"
)
func getConfigCommands(pluginFilterFlags []cli.Flag, outputBuffer io.Writer) []*cli.Command {
return []*cli.Command{
{
Name: "config",
Usage: "commands for generating and migrating configurations",
Flags: pluginFilterFlags,
Action: func(cCtx *cli.Context) error {
// The sub_Filters are populated when the filter flags are set after the subcommand config
// e.g. telegraf config --section-filter inputs
filters := processFilterFlags(cCtx)
printSampleConfig(outputBuffer, filters)
return nil
},
Subcommands: []*cli.Command{
{
Name: "create",
Usage: "create a full sample configuration and show it",
Description: `
The 'create' produces a full configuration containing all plugins as an example
and shows it on the console. You may apply 'section' or 'plugin' filtering
to reduce the output to the plugins you need
Create the full configuration
> telegraf config create
To produce a configuration only containing a Modbus input plugin and an
InfluxDB v2 output plugin use
> telegraf config create --section-filter "inputs:outputs" --input-filter "modbus" --output-filter "influxdb_v2"
`,
Flags: pluginFilterFlags,
Action: func(cCtx *cli.Context) error {
filters := processFilterFlags(cCtx)
printSampleConfig(outputBuffer, filters)
return nil
},
},
{
Name: "migrate",
Usage: "migrate deprecated plugins and options of the configuration(s)",
Description: `
The 'migrate' command reads the configuration files specified via '--config' or
'--config-directory' and tries to migrate plugins or options that are currently
deprecated using the recommended replacements. If no configuration file is
explicitly specified the command reads the default locations and uses those
configuration files. Migrated files are stored with a '.migrated' suffix at the
location of the inputs. If you are migrating remote configurations the migrated
configurations is stored in the current directory using the filename of the URL
with a '.migrated' suffix.
It is highly recommended to test those migrated configurations before using
those files unattended!
To migrate the file 'mysettings.conf' use
> telegraf --config mysettings.conf config migrate
`,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "force",
Usage: "forces overwriting of an existing migration file",
},
},
Action: func(cCtx *cli.Context) error {
// Setup logging
telegraf.Debug = cCtx.Bool("debug")
logConfig := logger.LogConfig{Debug: telegraf.Debug}
if err := logger.SetupLogging(logConfig); err != nil {
return err
}
// Check if we have migrations at all. There might be
// none if you run a custom build without migrations
// enabled.
if len(migrations.PluginMigrations) == 0 {
return errors.New("no migrations available")
}
log.Printf("%d plugin migration(s) available", len(migrations.PluginMigrations))
// Collect the given configuration files
configFiles := cCtx.StringSlice("config")
configDir := cCtx.StringSlice("config-directory")
for _, fConfigDirectory := range configDir {
files, err := config.WalkDirectory(fConfigDirectory)
if err != nil {
return err
}
configFiles = append(configFiles, files...)
}
// If no "config" or "config-directory" flag(s) was
// provided we should load default configuration files
if len(configFiles) == 0 {
paths, err := config.GetDefaultConfigPath()
if err != nil {
return err
}
configFiles = paths
}
for _, fn := range configFiles {
log.Printf("D! Trying to migrate %q...", fn)
// Read and parse the config file
data, remote, err := config.LoadConfigFile(fn)
if err != nil {
return fmt.Errorf("opening input %q failed: %w", fn, err)
}
out, applied, err := config.ApplyMigrations(data)
if err != nil {
return err
}
// Do not write a migration file if nothing was done
if applied == 0 {
log.Printf("I! No migration applied for %q", fn)
continue
}
// Construct the output filename
// For remote locations we just save the filename
// with the migrated suffix.
outfn := fn + ".migrated"
if remote {
u, err := url.Parse(fn)
if err != nil {
return fmt.Errorf("parsing remote config URL %q failed: %w", fn, err)
}
outfn = filepath.Base(u.Path) + ".migrated"
}
log.Printf("I! %d migration applied for %q, writing result as %q", applied, fn, outfn)
// Make sure the file does not exist yet if we should not overwrite
if !cCtx.Bool("force") {
if _, err := os.Stat(outfn); !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("output file %q already exists", outfn)
}
}
// Write the output file
if err := os.WriteFile(outfn, out, 0640); err != nil {
return fmt.Errorf("writing output %q failed: %w", outfn, err)
}
}
return nil
},
},
},
},
}
}

View File

@ -1,4 +1,4 @@
// Command handling for secret-stores' "secret" command
// Command handling for secret-stores' "secrets" command
package main
import (

View File

@ -245,6 +245,11 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
return m.Run()
}
commands := append(
getConfigCommands(pluginFilterFlags, outputBuffer),
getSecretStoreCommands(m)...,
)
app := &cli.App{
Name: "Telegraf",
Usage: "The plugin-driven server agent for collecting & reporting metrics.",
@ -341,19 +346,6 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
}, extraFlags...),
Action: action,
Commands: append([]*cli.Command{
{
Name: "config",
Usage: "print out full sample configuration to stdout",
Flags: pluginFilterFlags,
Action: func(cCtx *cli.Context) error {
// The sub_Filters are populated when the filter flags are set after the subcommand config
// e.g. telegraf config --section-filter inputs
filters := processFilterFlags(cCtx)
printSampleConfig(outputBuffer, filters)
return nil
},
},
{
Name: "version",
Usage: "print current version to stdout",
@ -362,9 +354,7 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
return nil
},
},
},
getSecretStoreCommands(m)...,
),
}, commands...),
}
// Make sure we safely erase secrets

View File

@ -376,7 +376,7 @@ func WalkDirectory(path string) ([]string, error) {
// 1. $TELEGRAF_CONFIG_PATH
// 2. $HOME/.telegraf/telegraf.conf
// 3. /etc/telegraf/telegraf.conf and /etc/telegraf/telegraf.d/*.conf
func getDefaultConfigPath() ([]string, error) {
func GetDefaultConfigPath() ([]string, error) {
envfile := os.Getenv("TELEGRAF_CONFIG_PATH")
homefile := os.ExpandEnv("${HOME}/.telegraf/telegraf.conf")
etcfile := "/etc/telegraf/telegraf.conf"
@ -434,7 +434,7 @@ func (c *Config) LoadConfig(path string) error {
paths := []string{}
if path == "" {
if paths, err = getDefaultConfigPath(); err != nil {
if paths, err = GetDefaultConfigPath(); err != nil {
return err
}
} else {
@ -446,7 +446,7 @@ func (c *Config) LoadConfig(path string) error {
log.Printf("I! Loading config: %s", path)
}
data, err := LoadConfigFile(path)
data, _, err := LoadConfigFile(path)
if err != nil {
return fmt.Errorf("error loading config file %s: %w", path, err)
}
@ -696,33 +696,37 @@ func trimBOM(f []byte) []byte {
return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}
func LoadConfigFile(config string) ([]byte, error) {
// LoadConfigFile loads the content of a configuration file and returns it
// together with a flag denoting if the file is from a remote location such
// as a web server.
func LoadConfigFile(config string) ([]byte, bool, error) {
if fetchURLRe.MatchString(config) {
u, err := url.Parse(config)
if err != nil {
return nil, err
return nil, true, err
}
switch u.Scheme {
case "https", "http":
return fetchConfig(u)
data, err := fetchConfig(u)
return data, true, err
default:
return nil, fmt.Errorf("scheme %q not supported", u.Scheme)
return nil, true, fmt.Errorf("scheme %q not supported", u.Scheme)
}
}
// If it isn't a https scheme, try it as a file
buffer, err := os.ReadFile(config)
if err != nil {
return nil, err
return nil, false, err
}
mimeType := http.DetectContentType(buffer)
if !strings.Contains(mimeType, "text/plain") {
return nil, fmt.Errorf("provided config is not a TOML file: %s", config)
return nil, false, fmt.Errorf("provided config is not a TOML file: %s", config)
}
return buffer, nil
return buffer, false, nil
}
func fetchConfig(u *url.URL) ([]byte, error) {

View File

@ -3,6 +3,8 @@ package config_test
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
@ -441,6 +443,21 @@ func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) {
}
}
func TestGetDefaultConfigPathFromEnvURL(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
c := config.NewConfig()
t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL)
configPath, err := config.GetDefaultConfigPath()
require.NoError(t, err)
require.Equal(t, []string{ts.URL}, configPath)
err = c.LoadConfig("")
require.NoError(t, err)
}
func TestConfig_URLLikeFileName(t *testing.T) {
c := config.NewConfig()
err := c.LoadConfig("http:##www.example.com.conf")

View File

@ -135,18 +135,3 @@ func TestURLRetries3FailsThenPasses(t *testing.T) {
require.NoError(t, c.LoadConfig(ts.URL))
require.Equal(t, 4, responseCounter)
}
func TestConfig_getDefaultConfigPathFromEnvURL(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
c := NewConfig()
t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL)
configPath, err := getDefaultConfigPath()
require.NoError(t, err)
require.Equal(t, []string{ts.URL}, configPath)
err = c.LoadConfig("")
require.NoError(t, err)
}

181
config/migration.go Normal file
View File

@ -0,0 +1,181 @@
package config
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"log"
"sort"
"strings"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
"github.com/influxdata/telegraf/migrations"
_ "github.com/influxdata/telegraf/migrations/all" // register all migrations
)
type section struct {
name string
begin int
content *ast.Table
raw *bytes.Buffer
}
func splitToSections(root *ast.Table) ([]section, error) {
var sections []section
for name, elements := range root.Fields {
switch name {
case "inputs", "outputs", "processors", "aggregators":
category, ok := elements.(*ast.Table)
if !ok {
return nil, fmt.Errorf("%q is not a table (%T)", name, category)
}
for plugin, elements := range category.Fields {
tbls, ok := elements.([]*ast.Table)
if !ok {
return nil, fmt.Errorf("elements of \"%s.%s\" is not a list of tables (%T)", name, plugin, elements)
}
for _, tbl := range tbls {
s := section{
name: name + "." + tbl.Name,
begin: tbl.Line,
content: tbl,
raw: &bytes.Buffer{},
}
sections = append(sections, s)
}
}
default:
tbl, ok := elements.(*ast.Table)
if !ok {
return nil, fmt.Errorf("%q is not a table (%T)", name, elements)
}
s := section{
name: name,
begin: tbl.Line,
content: tbl,
raw: &bytes.Buffer{},
}
sections = append(sections, s)
}
}
// Sort the TOML elements by begin (line-number)
sort.SliceStable(sections, func(i, j int) bool { return sections[i].begin < sections[j].begin })
return sections, nil
}
func assignTextToSections(data []byte, sections []section) ([]section, error) {
// Now assign the raw text to each section
if sections[0].begin > 0 {
sections = append([]section{{
name: "header",
begin: 0,
raw: &bytes.Buffer{},
}}, sections...)
}
var lineno int
scanner := bufio.NewScanner(bytes.NewBuffer(data))
for idx, next := range sections[1:] {
var buf bytes.Buffer
for lineno < next.begin-1 {
if !scanner.Scan() {
break
}
lineno++
line := strings.TrimSpace(scanner.Text())
if strings.HasPrefix(line, "#") {
_, _ = buf.Write(scanner.Bytes())
_, _ = buf.WriteString("\n")
continue
} else if buf.Len() > 0 {
if _, err := io.Copy(sections[idx].raw, &buf); err != nil {
return nil, fmt.Errorf("copying buffer failed: %w", err)
}
buf.Reset()
}
_, _ = sections[idx].raw.Write(scanner.Bytes())
_, _ = sections[idx].raw.WriteString("\n")
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("splitting by line failed: %w", err)
}
// If a comment is directly in front of the next section, without
// newline, the comment is assigned to the next section.
if buf.Len() > 0 {
if _, err := io.Copy(sections[idx+1].raw, &buf); err != nil {
return nil, fmt.Errorf("copying buffer failed: %w", err)
}
buf.Reset()
}
}
// Write the remaining to the last section
for scanner.Scan() {
_, _ = sections[len(sections)-1].raw.Write(scanner.Bytes())
_, _ = sections[len(sections)-1].raw.WriteString("\n")
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("splitting by line failed: %w", err)
}
return sections, nil
}
func ApplyMigrations(data []byte) ([]byte, uint64, error) {
root, err := toml.Parse(data)
if err != nil {
return nil, 0, fmt.Errorf("parsing failed: %w", err)
}
// Split the configuration into sections containing the location
// in the file.
sections, err := splitToSections(root)
if err != nil {
return nil, 0, fmt.Errorf("splitting to sections failed: %w", err)
}
if len(sections) == 0 {
return nil, 0, errors.New("no TOML configuration found")
}
// Assign the configuration text to the corresponding segments
sections, err = assignTextToSections(data, sections)
if err != nil {
return nil, 0, fmt.Errorf("assigning text failed: %w", err)
}
// Do the actual migration(s)
var applied uint64
for idx, s := range sections {
migrate, found := migrations.PluginMigrations[s.name]
if !found {
continue
}
log.Printf("D! migrating plugin %q in line %d...", s.name, s.begin)
result, err := migrate(s.content)
if err != nil {
return nil, 0, fmt.Errorf("migrating %q (line %d) failed: %w", s.name, s.begin, err)
}
s.raw = bytes.NewBuffer(result)
sections[idx] = s
applied++
}
var buf bytes.Buffer
for _, s := range sections {
_, err = s.raw.WriteTo(&buf)
if err != nil {
return nil, applied, fmt.Errorf("joining output failed: %w", err)
}
}
return buf.Bytes(), applied, nil
}

1
migrations/all/all.go Normal file
View File

@ -0,0 +1 @@
package all

View File

@ -0,0 +1,5 @@
//go:build !custom || (migrations && (inputs || inputs.cassandra))
package all
import _ "github.com/influxdata/telegraf/migrations/inputs_cassandra" // register migration

View File

@ -0,0 +1,15 @@
package common
type FilterOptions struct {
NamePass []string `toml:"namepass"`
NameDrop []string `toml:"namedrop"`
FieldPassOld []string `toml:"pass"`
FieldPass []string `toml:"fieldpass"`
FieldDropOld []string `toml:"drop"`
FieldDrop []string `toml:"fielddrop"`
TagPassFilters map[string][]string `toml:"tagpass"`
TagDropFilters map[string][]string `toml:"tagdrop"`
TagExclude []string `toml:"tagexclude"`
TagInclude []string `toml:"taginclude"`
MetricPass string `toml:"metricpass"`
}

View File

@ -0,0 +1,14 @@
package common
type InputOptions struct {
Interval string `toml:"interval"`
Precision string `toml:"precision"`
CollectionJitter string `toml:"collection_jitter"`
CollectionOffset string `toml:"collection_offset"`
NamePrefix string `toml:"name_prefix"`
NameSuffix string `toml:"name_suffix"`
NameOverride string `toml:"name_override"`
Alias string `toml:"alias"`
Tags map[string]string `toml:"tags"`
FilterOptions
}

View File

View File

@ -0,0 +1,247 @@
package inputs_cassandra
import (
"fmt"
"net/url"
"sort"
"strings"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
"github.com/influxdata/telegraf/migrations"
"github.com/influxdata/telegraf/migrations/common"
)
// Define "old" data structure
type cassandra struct {
Context string `toml:"context"`
Servers []string `toml:"servers"`
Metrics []string `toml:"metrics"`
common.InputOptions
}
// Define "new" data structure(s)
type metricConfig struct {
Name string `toml:"name"`
Mbean string `toml:"mbean"`
FieldPrefix *string `toml:"field_prefix,omitempty"`
TagKeys []string `toml:"tag_keys,omitempty"`
}
type jolokiaAgent struct {
URLs []string `toml:"urls"`
Username string `toml:"username,omitempty"`
Password string `toml:"password,omitempty"`
Metrics []metricConfig `toml:"metric"`
// Common options
Interval string `toml:"interval,omitempty"`
Precision string `toml:"precision,omitempty"`
CollectionJitter string `toml:"collection_jitter,omitempty"`
CollectionOffset string `toml:"collection_offset,omitempty"`
NamePrefix string `toml:"name_prefix,omitempty"`
NameSuffix string `toml:"name_suffix,omitempty"`
NameOverride string `toml:"name_override,omitempty"`
Alias string `toml:"alias,omitempty"`
Tags map[string]string `toml:"tags,omitempty"`
NamePass []string `toml:"namepass,omitempty"`
NameDrop []string `toml:"namedrop,omitempty"`
FieldPass []string `toml:"fieldpass,omitempty"`
FieldDrop []string `toml:"fielddrop,omitempty"`
TagPassFilters map[string][]string `toml:"tagpass,omitempty"`
TagDropFilters map[string][]string `toml:"tagdrop,omitempty"`
TagExclude []string `toml:"tagexclude,omitempty"`
TagInclude []string `toml:"taginclude,omitempty"`
MetricPass string `toml:"metricpass,omitempty"`
}
// Migration function
func migrate(tbl *ast.Table) ([]byte, error) {
// Decode the old data structure
var old cassandra
if err := toml.UnmarshalTable(tbl, &old); err != nil {
return nil, err
}
// Collect servers that use the same credentials
endpoints := make(map[string]jolokiaAgent)
for _, server := range old.Servers {
u, err := url.Parse("http://" + server)
if err != nil {
return nil, fmt.Errorf("invalid url %q: %w", server, err)
}
if u.Path != "" {
return nil, fmt.Errorf("unexpected path in %q: %w", server, err)
}
if u.Hostname() == "" {
u.Host = "localhost:" + u.Port()
}
user := u.User.Username()
passwd, _ := u.User.Password()
key := user + ":" + passwd
endpoint, found := endpoints[key]
if !found {
endpoint = jolokiaAgent{
Username: user,
Password: passwd,
}
endpoint.fillCommon(old.InputOptions)
}
u.User = nil
endpoint.URLs = append(endpoint.URLs, u.String())
endpoints[key] = endpoint
}
// Create new-style metrics according to the old config
var javaMetrics []metricConfig
var cassandraMetrics []metricConfig
for _, metric := range old.Metrics {
bean := strings.TrimPrefix(metric, "/")
params := make(map[string]string)
parts := strings.SplitN(bean, ":", 2)
for _, p := range strings.Split(parts[1], ",") {
x := strings.SplitN(p, "=", 2)
params[x[0]] = x[1]
}
name, found := params["type"]
if !found {
return nil, fmt.Errorf("cannot determine name for metric %q", metric)
}
name = strings.SplitN(name, "/", 2)[0]
var tagKeys []string
var prefix *string
for k := range params {
switch k {
case "name", "scope", "path", "keyspace":
tagKeys = append(tagKeys, k)
}
}
sort.Strings(tagKeys)
for i, k := range tagKeys {
if k == "name" {
p := fmt.Sprintf("$%d_", i+1)
prefix = &p
break
}
}
switch {
case strings.HasPrefix(bean, "java.lang:"):
javaMetrics = append(javaMetrics, metricConfig{
Name: name,
Mbean: bean,
TagKeys: tagKeys,
FieldPrefix: prefix,
})
case strings.HasPrefix(bean, "org.apache.cassandra.metrics:"):
cassandraMetrics = append(cassandraMetrics, metricConfig{
Name: name,
Mbean: bean,
TagKeys: tagKeys,
FieldPrefix: prefix,
})
default:
return nil, fmt.Errorf("unknown java metric %q", metric)
}
}
// Create the corresponding metric configurations
cfg := migrations.CreateTOMLStruct("inputs", "jolokia2_agent")
for _, endpoint := range endpoints {
if len(javaMetrics) > 0 {
plugin := jolokiaAgent{
URLs: endpoint.URLs,
Username: endpoint.Username,
Password: endpoint.Password,
Metrics: javaMetrics,
}
plugin.fillCommon(old.InputOptions)
plugin.NamePrefix = "java"
cfg.Add("inputs", "jolokia2_agent", plugin)
}
if len(cassandraMetrics) > 0 {
plugin := jolokiaAgent{
URLs: endpoint.URLs,
Username: endpoint.Username,
Password: endpoint.Password,
Metrics: cassandraMetrics,
}
plugin.fillCommon(old.InputOptions)
plugin.NamePrefix = "cassandra"
cfg.Add("inputs", "jolokia2_agent", plugin)
}
}
// Marshal the new configuration
buf, err := toml.Marshal(cfg)
if err != nil {
return nil, err
}
buf = append(buf, []byte("\n")...)
// Create the new content to output
return buf, nil
}
func (j *jolokiaAgent) fillCommon(o common.InputOptions) {
j.Interval = o.Interval
j.Precision = o.Precision
j.CollectionJitter = o.CollectionJitter
j.CollectionOffset = o.CollectionOffset
j.NamePrefix = o.NamePrefix
j.NameSuffix = o.NameSuffix
j.NameOverride = o.NameOverride
j.Alias = o.Alias
if len(o.Tags) > 0 {
j.Tags = make(map[string]string, len(o.Tags))
for k, v := range o.Tags {
j.Tags[k] = v
}
}
if len(o.NamePass) > 0 {
j.NamePass = append(j.NamePass, o.NamePass...)
}
if len(o.NameDrop) > 0 {
j.NameDrop = append(j.NameDrop, o.NameDrop...)
}
if len(o.FieldPass) > 0 || len(o.FieldDropOld) > 0 {
j.FieldPass = append(j.FieldPass, o.FieldPass...)
j.FieldPass = append(j.FieldPass, o.FieldPassOld...)
}
if len(o.FieldDrop) > 0 || len(o.FieldDropOld) > 0 {
j.FieldDrop = append(j.FieldDrop, o.FieldDrop...)
j.FieldDrop = append(j.FieldDrop, o.FieldDropOld...)
}
if len(o.TagPassFilters) > 0 {
j.TagPassFilters = make(map[string][]string, len(o.TagPassFilters))
for k, v := range o.TagPassFilters {
j.TagPassFilters[k] = v
}
}
if len(o.TagDropFilters) > 0 {
j.TagDropFilters = make(map[string][]string, len(o.TagDropFilters))
for k, v := range o.TagDropFilters {
j.TagDropFilters[k] = v
}
}
if len(o.TagExclude) > 0 {
j.TagExclude = append(j.TagExclude, o.TagExclude...)
}
if len(o.TagInclude) > 0 {
j.TagInclude = append(j.TagInclude, o.TagInclude...)
}
j.MetricPass = o.MetricPass
}
// Register the migration function for the plugin type
func init() {
migrations.AddPluginMigration("inputs.cassandra", migrate)
}

View File

@ -0,0 +1,61 @@
package inputs_cassandra_test
import (
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/config"
_ "github.com/influxdata/telegraf/migrations/inputs_cassandra" // register migration
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia2_agent" // register plugin
)
func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
inputFile := filepath.Join(testcasePath, "telegraf.conf")
expectedFile := filepath.Join(testcasePath, "expected.conf")
// Read the expected output
expected := config.NewConfig()
require.NoError(t, expected.LoadConfig(expectedFile))
require.NotEmpty(t, expected.Inputs)
// Read the input data
input, remote, err := config.LoadConfigFile(inputFile)
require.NoError(t, err)
require.False(t, remote)
require.NotEmpty(t, input)
// Migrate
output, n, err := config.ApplyMigrations(input)
require.NoError(t, err)
require.NotEmpty(t, output)
require.GreaterOrEqual(t, n, uint64(1))
actual := config.NewConfig()
require.NoError(t, actual.LoadConfigData(output))
// Test the output
require.Len(t, actual.Inputs, len(expected.Inputs))
actualIDs := make([]string, 0, len(expected.Inputs))
expectedIDs := make([]string, 0, len(expected.Inputs))
for i := range actual.Inputs {
actualIDs = append(actualIDs, actual.Inputs[i].ID())
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
}
require.ElementsMatch(t, expectedIDs, actualIDs)
})
}
}

View File

@ -0,0 +1,55 @@
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.1:8778"]
username = "myuser"
password = "mypassword"
name_prefix = "java"
[[inputs.jolokia2_agent.metric]]
name = "Memory"
mbean = "java.lang:type=Memory/HeapMemoryUsage"
[inputs.jolokia2_agent.tagdrop]
app = ["myapp"]
location = ["e*"]
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.1:8778"]
username = "myuser"
password = "mypassword"
name_prefix = "cassandra"
[[inputs.jolokia2_agent.metric]]
name = "Table"
mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
field_prefix = "$2_"
tag_keys = ["keyspace", "name", "scope"]
[inputs.jolokia2_agent.tagdrop]
app = ["myapp"]
location = ["e*"]
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.2:8778", "http://localhost:8778"]
name_prefix = "java"
[[inputs.jolokia2_agent.metric]]
name = "Memory"
mbean = "java.lang:type=Memory/HeapMemoryUsage"
[inputs.jolokia2_agent.tagdrop]
app = ["myapp"]
location = ["e*"]
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.2:8778", "http://localhost:8778"]
name_prefix = "cassandra"
[[inputs.jolokia2_agent.metric]]
name = "Table"
mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
field_prefix = "$2_"
tag_keys = ["keyspace", "name", "scope"]
[inputs.jolokia2_agent.tagdrop]
app = ["myapp"]
location = ["e*"]

View File

@ -0,0 +1,11 @@
[[inputs.cassandra]]
context = "/jolokia/read"
servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"]
metrics = [
"/java.lang:type=Memory/HeapMemoryUsage",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
]
[inputs.cassandra.tagdrop]
app = ["myapp"]
location = ["e*"]

View File

@ -0,0 +1,64 @@
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.1:8778"]
username = "myuser"
password = "mypassword"
interval = "20s"
name_prefix = "java"
alias = "mycassandra"
[[inputs.jolokia2_agent.metric]]
name = "Memory"
mbean = "java.lang:type=Memory/HeapMemoryUsage"
[inputs.jolokia2_agent.tags]
app = "myapp"
location = "east"
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.1:8778"]
username = "myuser"
password = "mypassword"
interval = "20s"
name_prefix = "cassandra"
alias = "mycassandra"
[[inputs.jolokia2_agent.metric]]
name = "Table"
mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
field_prefix = "$2_"
tag_keys = ["keyspace", "name", "scope"]
[inputs.jolokia2_agent.tags]
app = "myapp"
location = "east"
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.2:8778", "http://localhost:8778"]
interval = "20s"
name_prefix = "java"
alias = "mycassandra"
[[inputs.jolokia2_agent.metric]]
name = "Memory"
mbean = "java.lang:type=Memory/HeapMemoryUsage"
[inputs.jolokia2_agent.tags]
app = "myapp"
location = "east"
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.2:8778", "http://localhost:8778"]
interval = "20s"
name_prefix = "cassandra"
alias = "mycassandra"
[[inputs.jolokia2_agent.metric]]
name = "Table"
mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
field_prefix = "$2_"
tag_keys = ["keyspace", "name", "scope"]
[inputs.jolokia2_agent.tags]
app = "myapp"
location = "east"

View File

@ -0,0 +1,13 @@
[[inputs.cassandra]]
interval = "20s"
alias = "mycassandra"
context = "/jolokia/read"
servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"]
metrics = [
"/java.lang:type=Memory/HeapMemoryUsage",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
]
[inputs.cassandra.tags]
app = "myapp"
location = "east"

View File

@ -0,0 +1,40 @@
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.1:8778"]
username = "myuser"
password = "mypassword"
name_prefix = "java"
[[inputs.jolokia2_agent.metric]]
name = "Memory"
mbean = "java.lang:type=Memory/HeapMemoryUsage"
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.1:8778"]
username = "myuser"
password = "mypassword"
name_prefix = "cassandra"
[[inputs.jolokia2_agent.metric]]
name = "Table"
mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
field_prefix = "$2_"
tag_keys = ["keyspace", "name", "scope"]
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.2:8778", "http://localhost:8778"]
name_prefix = "java"
[[inputs.jolokia2_agent.metric]]
name = "Memory"
mbean = "java.lang:type=Memory/HeapMemoryUsage"
[[inputs.jolokia2_agent]]
urls = ["http://10.10.10.2:8778", "http://localhost:8778"]
name_prefix = "cassandra"
[[inputs.jolokia2_agent.metric]]
name = "Table"
mbean = "org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
field_prefix = "$2_"
tag_keys = ["keyspace", "name", "scope"]

View File

@ -0,0 +1,14 @@
[[inputs.cassandra]]
context = "/jolokia/read"
## List of cassandra servers exposing jolokia read service
servers = ["myuser:mypassword@10.10.10.1:8778","10.10.10.2:8778",":8778"]
## List of metrics collected on above servers
## Each metric consists of a jmx path.
## This will collect all heap memory usage metrics from the jvm and
## ReadLatency metrics for all keyspaces and tables.
## "type=Table" in the query works with Cassandra3.0. Older versions might
## need to use "type=ColumnFamily"
metrics = [
"/java.lang:type=Memory/HeapMemoryUsage",
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
]

33
migrations/registry.go Normal file
View File

@ -0,0 +1,33 @@
package migrations
import (
"fmt"
"github.com/influxdata/toml/ast"
)
type PluginMigrationFunc func(*ast.Table) ([]byte, error)
var PluginMigrations = make(map[string]PluginMigrationFunc)
func AddPluginMigration(name string, f PluginMigrationFunc) {
if _, found := PluginMigrations[name]; found {
panic(fmt.Errorf("plugin migration function already registered for %q", name))
}
PluginMigrations[name] = f
}
type pluginTOMLStruct map[string]map[string][]interface{}
func CreateTOMLStruct(category, name string) pluginTOMLStruct {
return map[string]map[string][]interface{}{
category: {
name: make([]interface{}, 0),
},
}
}
func (p *pluginTOMLStruct) Add(category, name string, plugin interface{}) {
cfg := map[string]map[string][]interface{}(*p)
cfg[category][name] = append(cfg[category][name], plugin)
}

View File

@ -109,7 +109,7 @@ func (s *selection) Filter(p packageCollection) *packageCollection {
func (s *selection) importFiles(configurations []string) error {
for _, cfg := range configurations {
buf, err := config.LoadConfigFile(cfg)
buf, _, err := config.LoadConfigFile(cfg)
if err != nil {
return fmt.Errorf("reading %q failed: %w", cfg, err)
}

View File

@ -63,7 +63,7 @@ func usage() {
}
func main() {
var dryrun, showtags, quiet bool
var dryrun, showtags, migrations, quiet bool
var configFiles, configDirs []string
flag.Func("config",
@ -82,6 +82,7 @@ func main() {
)
flag.BoolVar(&dryrun, "dry-run", false, "Skip the actual building step")
flag.BoolVar(&quiet, "quiet", false, "Print fewer log messages")
flag.BoolVar(&migrations, "migrations", false, "Include configuration migrations")
flag.BoolVar(&showtags, "tags", false, "Show build-tags used")
flag.Usage = usage
@ -125,7 +126,11 @@ func main() {
if len(tagset) == 0 {
log.Fatalln("Nothing selected!")
}
tags := "custom," + strings.Join(tagset, ",")
tags := "custom,"
if migrations {
tags += "migrations,"
}
tags += strings.Join(tagset, ",")
if showtags {
fmt.Printf("Build tags: %s\n", tags)
}