diff --git a/config/migration.go b/config/migration.go index fe0be54eb..453e7ade4 100644 --- a/config/migration.go +++ b/config/migration.go @@ -160,10 +160,13 @@ func ApplyMigrations(data []byte) ([]byte, uint64, error) { } log.Printf("D! migrating plugin %q in line %d...", s.name, s.begin) - result, err := migrate(s.content) + result, msg, err := migrate(s.content) if err != nil { return nil, 0, fmt.Errorf("migrating %q (line %d) failed: %w", s.name, s.begin, err) } + if msg != "" { + log.Printf("I! Plugin %q in line %d: %s", s.name, s.begin, msg) + } s.raw = bytes.NewBuffer(result) sections[idx] = s applied++ diff --git a/migrations/all/inputs_httpjson.go b/migrations/all/inputs_httpjson.go new file mode 100644 index 000000000..04d6ba32b --- /dev/null +++ b/migrations/all/inputs_httpjson.go @@ -0,0 +1,5 @@ +//go:build !custom || (migrations && (inputs || inputs.httpjson)) + +package all + +import _ "github.com/influxdata/telegraf/migrations/inputs_httpjson" // register migration diff --git a/migrations/common/filter_options.go b/migrations/common/filter_options.go index 7da9e8cc3..450fe580e 100644 --- a/migrations/common/filter_options.go +++ b/migrations/common/filter_options.go @@ -1,15 +1,15 @@ package common type FilterOptions struct { - NamePass []string `toml:"namepass"` - NameDrop []string `toml:"namedrop"` - FieldPassOld []string `toml:"pass"` - FieldPass []string `toml:"fieldpass"` - FieldDropOld []string `toml:"drop"` - FieldDrop []string `toml:"fielddrop"` - TagPassFilters map[string][]string `toml:"tagpass"` - TagDropFilters map[string][]string `toml:"tagdrop"` - TagExclude []string `toml:"tagexclude"` - TagInclude []string `toml:"taginclude"` - MetricPass string `toml:"metricpass"` + NamePass []string `toml:"namepass,omitempty"` + NameDrop []string `toml:"namedrop,omitempty"` + FieldPassOld []string `toml:"pass,omitempty"` + FieldPass []string `toml:"fieldpass,omitempty"` + FieldDropOld []string `toml:"drop,omitempty"` + FieldDrop []string `toml:"fielddrop,omitempty"` + TagPassFilters map[string][]string `toml:"tagpass,omitempty"` + TagDropFilters map[string][]string `toml:"tagdrop,omitempty"` + TagExclude []string `toml:"tagexclude,omitempty"` + TagInclude []string `toml:"taginclude,omitempty"` + MetricPass string `toml:"metricpass,omitempty"` } diff --git a/migrations/common/input_options.go b/migrations/common/input_options.go index 4ab414021..c5d579c74 100644 --- a/migrations/common/input_options.go +++ b/migrations/common/input_options.go @@ -1,14 +1,27 @@ package common type InputOptions struct { - Interval string `toml:"interval"` - Precision string `toml:"precision"` - CollectionJitter string `toml:"collection_jitter"` - CollectionOffset string `toml:"collection_offset"` - NamePrefix string `toml:"name_prefix"` - NameSuffix string `toml:"name_suffix"` - NameOverride string `toml:"name_override"` - Alias string `toml:"alias"` - Tags map[string]string `toml:"tags"` - FilterOptions + // General options + Interval string `toml:"interval,omitempty"` + Precision string `toml:"precision,omitempty"` + CollectionJitter string `toml:"collection_jitter,omitempty"` + CollectionOffset string `toml:"collection_offset,omitempty"` + NamePrefix string `toml:"name_prefix,omitempty"` + NameSuffix string `toml:"name_suffix,omitempty"` + NameOverride string `toml:"name_override,omitempty"` + Alias string `toml:"alias,omitempty"` + Tags map[string]string `toml:"tags,omitempty"` + + // Filter options + NamePass []string `toml:"namepass,omitempty"` + NameDrop []string `toml:"namedrop,omitempty"` + FieldPassOld []string `toml:"pass,omitempty"` + FieldPass []string `toml:"fieldpass,omitempty"` + FieldDropOld []string `toml:"drop,omitempty"` + FieldDrop []string `toml:"fielddrop,omitempty"` + TagPassFilters map[string][]string `toml:"tagpass,omitempty"` + TagDropFilters map[string][]string `toml:"tagdrop,omitempty"` + TagExclude []string `toml:"tagexclude,omitempty"` + TagInclude []string `toml:"taginclude,omitempty"` + MetricPass string `toml:"metricpass,omitempty"` } diff --git a/migrations/inputs_cassandra/migration.go b/migrations/inputs_cassandra/migration.go index 5dd962775..194c62b0c 100644 --- a/migrations/inputs_cassandra/migration.go +++ b/migrations/inputs_cassandra/migration.go @@ -58,11 +58,11 @@ type jolokiaAgent struct { } // Migration function -func migrate(tbl *ast.Table) ([]byte, error) { +func migrate(tbl *ast.Table) ([]byte, string, error) { // Decode the old data structure var old cassandra if err := toml.UnmarshalTable(tbl, &old); err != nil { - return nil, err + return nil, "", err } // Collect servers that use the same credentials @@ -70,10 +70,10 @@ func migrate(tbl *ast.Table) ([]byte, error) { for _, server := range old.Servers { u, err := url.Parse("http://" + server) if err != nil { - return nil, fmt.Errorf("invalid url %q: %w", server, err) + return nil, "", fmt.Errorf("invalid url %q: %w", server, err) } if u.Path != "" { - return nil, fmt.Errorf("unexpected path in %q: %w", server, err) + return nil, "", fmt.Errorf("unexpected path in %q: %w", server, err) } if u.Hostname() == "" { u.Host = "localhost:" + u.Port() @@ -110,7 +110,7 @@ func migrate(tbl *ast.Table) ([]byte, error) { name, found := params["type"] if !found { - return nil, fmt.Errorf("cannot determine name for metric %q", metric) + return nil, "", fmt.Errorf("cannot determine name for metric %q", metric) } name = strings.SplitN(name, "/", 2)[0] @@ -147,7 +147,7 @@ func migrate(tbl *ast.Table) ([]byte, error) { FieldPrefix: prefix, }) default: - return nil, fmt.Errorf("unknown java metric %q", metric) + return nil, "", fmt.Errorf("unknown java metric %q", metric) } } @@ -182,12 +182,12 @@ func migrate(tbl *ast.Table) ([]byte, error) { // Marshal the new configuration buf, err := toml.Marshal(cfg) if err != nil { - return nil, err + return nil, "", err } buf = append(buf, []byte("\n")...) // Create the new content to output - return buf, nil + return buf, "", nil } func (j *jolokiaAgent) fillCommon(o common.InputOptions) { diff --git a/migrations/inputs_httpjson/migration.go b/migrations/inputs_httpjson/migration.go new file mode 100644 index 000000000..d6cd513fe --- /dev/null +++ b/migrations/inputs_httpjson/migration.go @@ -0,0 +1,142 @@ +package inputs_httpjson + +import ( + "fmt" + "net/url" + + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + + "github.com/influxdata/telegraf/migrations" + "github.com/influxdata/telegraf/migrations/common" + "github.com/influxdata/telegraf/plugins/common/tls" +) + +const msg = ` + Replacement 'inputs.http' will not report the 'response_time' field and the + 'server' tag is replaced by the 'url' tag. Please adapt your queries! +` + +// Define "old" data structure +type httpJSON struct { + Name string `toml:"name"` + Servers []string + Method string + TagKeys []string + ResponseTimeout string + Parameters map[string]string + Headers map[string]string + tls.ClientConfig + common.InputOptions +} + +// Migration function +func migrate(tbl *ast.Table) ([]byte, string, error) { + // Decode the old data structure + var old httpJSON + if err := toml.UnmarshalTable(tbl, &old); err != nil { + return nil, "", err + } + + // Fill common options + plugin := make(map[string]interface{}) + general, err := toml.Marshal(old.InputOptions) + if err != nil { + return nil, "", fmt.Errorf("marshalling general options failed: %w", err) + } + if err := toml.Unmarshal(general, &plugin); err != nil { + return nil, "", fmt.Errorf("re-unmarshalling general options failed: %w", err) + } + + // Use a map for the new plugin and fill in the data + plugin["urls"] = old.Servers + if old.Name != "" { + if x, found := plugin["name_override"]; found && x != old.Name { + return nil, "", fmt.Errorf("conflicting 'name' (%s) and 'name_override' (%s) setting", old.Name, old.NameOverride) + } + plugin["name_override"] = old.Name + } + if _, found := plugin["name_override"]; !found { + plugin["name_override"] = "httpjson" + } + if old.Method != "" && old.Method != "GET" { + plugin["method"] = old.Method + } + if len(old.TagKeys) > 0 { + plugin["tag_keys"] = old.TagKeys + } + if old.ResponseTimeout != "" { + plugin["timeout"] = old.ResponseTimeout + } + if len(old.Headers) > 0 { + plugin["headers"] = old.Headers + } + if len(old.Parameters) > 0 { + urls := make([]string, 0, len(old.Servers)) + for _, s := range old.Servers { + u, err := url.Parse(s) + if err != nil { + return nil, "", fmt.Errorf("parsing server %q failed: %w", s, err) + } + q := u.Query() + for k, v := range old.Parameters { + q.Add(k, v) + } + u.RawQuery = q.Encode() + urls = append(urls, u.String()) + } + plugin["urls"] = urls + } + + // Convert TLS parameters + if old.TLSCA != "" { + plugin["tls_ca"] = old.TLSCA + } + if old.TLSCert != "" { + plugin["tls_cert"] = old.TLSCert + } + + if old.TLSKey != "" { + plugin["tls_key"] = old.TLSKey + } + if old.TLSKeyPwd != "" { + plugin["tls_key_pwd"] = old.TLSKeyPwd + } + if old.TLSMinVersion != "" { + plugin["tls_min_version"] = old.TLSMinVersion + } + if old.InsecureSkipVerify { + plugin["insecure_skip_verify"] = true + } + if old.ServerName != "" { + plugin["tls_server_name"] = old.ServerName + } + if old.RenegotiationMethod != "" { + plugin["tls_renegotiation_method"] = old.RenegotiationMethod + } + if old.Enable != nil { + plugin["tls_enable"] = *old.Enable + } + + // Parser settings + plugin["data_format"] = "json" + + // Create the corresponding metric configurations + cfg := migrations.CreateTOMLStruct("inputs", "http") + cfg.Add("inputs", "http", plugin) + + // Marshal the new configuration + buf, err := toml.Marshal(cfg) + if err != nil { + return nil, "", err + } + buf = append(buf, []byte("\n")...) + + // Create the new content to output + return buf, msg, nil +} + +// Register the migration function for the plugin type +func init() { + migrations.AddPluginMigration("inputs.httpjson", migrate) +} diff --git a/migrations/inputs_httpjson/migration_test.go b/migrations/inputs_httpjson/migration_test.go new file mode 100644 index 000000000..02cbce2ea --- /dev/null +++ b/migrations/inputs_httpjson/migration_test.go @@ -0,0 +1,147 @@ +package inputs_httpjson_test + +import ( + "errors" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + _ "github.com/influxdata/telegraf/migrations/inputs_httpjson" // register migration + httpplugin "github.com/influxdata/telegraf/plugins/inputs/http" // register plugin + _ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" +) + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + inputFile := filepath.Join(testcasePath, "telegraf.conf") + expectedFile := filepath.Join(testcasePath, "expected.conf") + + // Read the expected output + expected := config.NewConfig() + require.NoError(t, expected.LoadConfig(expectedFile)) + require.NotEmpty(t, expected.Inputs) + + // Read the input data + input, remote, err := config.LoadConfigFile(inputFile) + require.NoError(t, err) + require.False(t, remote) + require.NotEmpty(t, input) + + // Migrate + output, n, err := config.ApplyMigrations(input) + require.NoError(t, err) + require.NotEmpty(t, output) + require.GreaterOrEqual(t, n, uint64(1)) + actual := config.NewConfig() + require.NoError(t, actual.LoadConfigData(output)) + + // Test the output + require.Len(t, actual.Inputs, len(expected.Inputs)) + actualIDs := make([]string, 0, len(expected.Inputs)) + expectedIDs := make([]string, 0, len(expected.Inputs)) + for i := range actual.Inputs { + actualIDs = append(actualIDs, actual.Inputs[i].ID()) + expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) + } + require.ElementsMatchf(t, expectedIDs, actualIDs, "generated config: %s", string(output)) + }) + } +} + +func TestParsing(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + testcasePath := filepath.Join("testcases", f.Name()) + configFile := filepath.Join(testcasePath, "expected.conf") + inputFile := filepath.Join(testcasePath, "input.json") + expectedFile := filepath.Join(testcasePath, "output.influx") + + // Skip the testcase if it doesn't provide data + if _, err := os.Stat(inputFile); errors.Is(err, os.ErrNotExist) { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFile)) + require.Len(t, cfg.Inputs, 1) + plugin := cfg.Inputs[0].Input.(*httpplugin.HTTP) + + // Read the input data + input, err := os.ReadFile(inputFile) + require.NoError(t, err) + require.NotEmpty(t, input) + + // Read the expected output + expected, err := testutil.ParseMetricsFromFile(expectedFile, parser) + require.NoError(t, err) + require.NotEmpty(t, expected) + + // Start the test-server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/stats" { + _, _ = w.Write(input) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + // Point the plugin to the test-server and start the game + addr := server.URL + "/stats" + plugin.URLs = []string{addr} + require.NoError(t, plugin.Init()) + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + + // Prepare metrics for comparison + for i := range expected { + expected[i].AddTag("url", addr) + } + raw := acc.GetTelegrafMetrics() + actual := make([]telegraf.Metric, 0, len(raw)) + for _, m := range raw { + actual = append(actual, cfg.Inputs[0].MakeMetric(m)) + } + + // Compare + options := []cmp.Option{ + testutil.IgnoreTime(), + testutil.IgnoreTags("host"), + } + testutil.RequireMetricsEqual(t, expected, actual, options...) + }) + } +} diff --git a/migrations/inputs_httpjson/testcases/array/expected.conf b/migrations/inputs_httpjson/testcases/array/expected.conf new file mode 100644 index 000000000..e03d1ad53 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/array/expected.conf @@ -0,0 +1,5 @@ +[[inputs.http]] +name_override = "httpjson" +urls = ["http://localhost:9999/stats/"] +data_format = "json" +tag_keys = ["service"] diff --git a/migrations/inputs_httpjson/testcases/array/input.json b/migrations/inputs_httpjson/testcases/array/input.json new file mode 100644 index 000000000..a3227e5c1 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/array/input.json @@ -0,0 +1,20 @@ +[ + { + "service": "service01", + "a": 0.5, + "b": { + "c": "some text", + "d": 0.1, + "e": 5 + } + }, + { + "service": "service02", + "a": 0.6, + "b": { + "c": "some text", + "d": 0.2, + "e": 6 + } + } +] \ No newline at end of file diff --git a/migrations/inputs_httpjson/testcases/array/output.influx b/migrations/inputs_httpjson/testcases/array/output.influx new file mode 100644 index 000000000..4713410c2 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/array/output.influx @@ -0,0 +1,2 @@ +httpjson,url=http://localhost:9999/stats/,service=service01 a=0.5,b_d=0.1,b_e=5 +httpjson,url=http://localhost:9999/stats/,service=service02 a=0.6,b_d=0.2,b_e=6 \ No newline at end of file diff --git a/migrations/inputs_httpjson/testcases/array/telegraf.conf b/migrations/inputs_httpjson/testcases/array/telegraf.conf new file mode 100644 index 000000000..20f199327 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/array/telegraf.conf @@ -0,0 +1,3 @@ +[[inputs.httpjson]] + servers = ["http://localhost:9999/stats/"] + tag_keys = ["service"] \ No newline at end of file diff --git a/migrations/inputs_httpjson/testcases/headers/expected.conf b/migrations/inputs_httpjson/testcases/headers/expected.conf new file mode 100644 index 000000000..7a1492133 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/headers/expected.conf @@ -0,0 +1,8 @@ +[[inputs.http]] +name_override = "httpjson" +method = "POST" +urls = ["http://localhost:9999/stats"] +data_format = "json" +[inputs.http.headers] +X-Auth-Token = "my-xauth-token" +apiVersion = "v1" \ No newline at end of file diff --git a/migrations/inputs_httpjson/testcases/headers/telegraf.conf b/migrations/inputs_httpjson/testcases/headers/telegraf.conf new file mode 100644 index 000000000..5f54a9ea8 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/headers/telegraf.conf @@ -0,0 +1,6 @@ +[[inputs.httpjson]] + servers = ["http://localhost:9999/stats"] + method = "POST" + [inputs.httpjson.headers] + X-Auth-Token = "my-xauth-token" + apiVersion = "v1" \ No newline at end of file diff --git a/migrations/inputs_httpjson/testcases/params/expected.conf b/migrations/inputs_httpjson/testcases/params/expected.conf new file mode 100644 index 000000000..0d894c99e --- /dev/null +++ b/migrations/inputs_httpjson/testcases/params/expected.conf @@ -0,0 +1,4 @@ +[[inputs.http]] +name_override = "httpjson" +urls = ["http://localhost:9999/stats?event_type=cpu_spike&threshold=0.75"] +data_format = "json" diff --git a/migrations/inputs_httpjson/testcases/params/telegraf.conf b/migrations/inputs_httpjson/testcases/params/telegraf.conf new file mode 100644 index 000000000..48c8091b0 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/params/telegraf.conf @@ -0,0 +1,8 @@ +[[inputs.httpjson]] + servers = ["http://localhost:9999/stats"] + # HTTP Request Parameters (all values must be strings). For "GET" requests, data + # will be included in the query. For "POST" requests, data will be included + # in the request body as "x-www-form-urlencoded". + [inputs.httpjson.parameters] + event_type = "cpu_spike" + threshold = "0.75" diff --git a/migrations/inputs_httpjson/testcases/single/expected.conf b/migrations/inputs_httpjson/testcases/single/expected.conf new file mode 100644 index 000000000..55f01fc4a --- /dev/null +++ b/migrations/inputs_httpjson/testcases/single/expected.conf @@ -0,0 +1,5 @@ +[[inputs.http]] +name_override = "webserver_stats" +timeout = "5s" +urls = ["http://localhost:9999/stats/", "http://localhost:9998/stats/"] +data_format = "json" diff --git a/migrations/inputs_httpjson/testcases/single/input.json b/migrations/inputs_httpjson/testcases/single/input.json new file mode 100644 index 000000000..fa5531ee2 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/single/input.json @@ -0,0 +1,9 @@ +{ + "a": 0.5, + "b": { + "c": "some text", + "d": 0.1, + "e": 5 + }, + "service": "service01" +} \ No newline at end of file diff --git a/migrations/inputs_httpjson/testcases/single/output.influx b/migrations/inputs_httpjson/testcases/single/output.influx new file mode 100644 index 000000000..0424ef29d --- /dev/null +++ b/migrations/inputs_httpjson/testcases/single/output.influx @@ -0,0 +1 @@ +webserver_stats,url=http://localhost:9999/stats/ b_d=0.1,a=0.5,b_e=5 diff --git a/migrations/inputs_httpjson/testcases/single/telegraf.conf b/migrations/inputs_httpjson/testcases/single/telegraf.conf new file mode 100644 index 000000000..862eea653 --- /dev/null +++ b/migrations/inputs_httpjson/testcases/single/telegraf.conf @@ -0,0 +1,46 @@ +# Read flattened metrics from one or more JSON HTTP endpoints +[[inputs.httpjson]] + ## NOTE This plugin only reads numerical measurements, strings and booleans + ## will be ignored. + + ## Name for the service being polled. Will be appended to the name of the + ## measurement e.g. "httpjson_webserver_stats". + ## + ## Deprecated (1.3.0): Use name_override, name_suffix, name_prefix instead. + name = "webserver_stats" + + ## URL of each server in the service's cluster + servers = [ + "http://localhost:9999/stats/", + "http://localhost:9998/stats/", + ] + ## Set response_timeout (default 5 seconds) + response_timeout = "5s" + + ## HTTP method to use: GET or POST (case-sensitive) + method = "GET" + + ## Tags to extract from top-level of JSON server response. + # tag_keys = [ + # "my_tag_1", + # "my_tag_2" + # ] + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## HTTP Request Parameters (all values must be strings). For "GET" requests, data + ## will be included in the query. For "POST" requests, data will be included + ## in the request body as "x-www-form-urlencoded". + # [inputs.httpjson.parameters] + # event_type = "cpu_spike" + # threshold = "0.75" + + ## HTTP Request Headers (all values must be strings). + # [inputs.httpjson.headers] + # X-Auth-Token = "my-xauth-token" + # apiVersion = "v1" \ No newline at end of file diff --git a/migrations/registry.go b/migrations/registry.go index b399b0fc5..b03a663f7 100644 --- a/migrations/registry.go +++ b/migrations/registry.go @@ -6,7 +6,7 @@ import ( "github.com/influxdata/toml/ast" ) -type PluginMigrationFunc func(*ast.Table) ([]byte, error) +type PluginMigrationFunc func(*ast.Table) ([]byte, string, error) var PluginMigrations = make(map[string]PluginMigrationFunc) diff --git a/testutil/metric.go b/testutil/metric.go index f5a25758c..44f20a3d3 100644 --- a/testutil/metric.go +++ b/testutil/metric.go @@ -143,6 +143,21 @@ func IgnoreFields(names ...string) cmp.Option { ) } +// return disables comparison of the tags with the given names. +// The tag-names are case-sensitive! +func IgnoreTags(names ...string) cmp.Option { + return cmpopts.IgnoreSliceElements( + func(f *telegraf.Tag) bool { + for _, n := range names { + if f.Key == n { + return true + } + } + return false + }, + ) +} + // MetricEqual returns true if the metrics are equal. func MetricEqual(expected, actual telegraf.Metric, opts ...cmp.Option) bool { var lhs, rhs *metricDiff