feat(migrations): Add migration for 'inputs.httpjson' (#13532)

This commit is contained in:
Sven Rebhan 2023-07-05 20:40:22 +02:00 committed by GitHub
parent 10c081dab1
commit 86c11bce29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 473 additions and 31 deletions

View File

@ -160,10 +160,13 @@ func ApplyMigrations(data []byte) ([]byte, uint64, error) {
}
log.Printf("D! migrating plugin %q in line %d...", s.name, s.begin)
result, err := migrate(s.content)
result, msg, err := migrate(s.content)
if err != nil {
return nil, 0, fmt.Errorf("migrating %q (line %d) failed: %w", s.name, s.begin, err)
}
if msg != "" {
log.Printf("I! Plugin %q in line %d: %s", s.name, s.begin, msg)
}
s.raw = bytes.NewBuffer(result)
sections[idx] = s
applied++

View File

@ -0,0 +1,5 @@
//go:build !custom || (migrations && (inputs || inputs.httpjson))
package all
import _ "github.com/influxdata/telegraf/migrations/inputs_httpjson" // register migration

View File

@ -1,15 +1,15 @@
package common
type FilterOptions struct {
NamePass []string `toml:"namepass"`
NameDrop []string `toml:"namedrop"`
FieldPassOld []string `toml:"pass"`
FieldPass []string `toml:"fieldpass"`
FieldDropOld []string `toml:"drop"`
FieldDrop []string `toml:"fielddrop"`
TagPassFilters map[string][]string `toml:"tagpass"`
TagDropFilters map[string][]string `toml:"tagdrop"`
TagExclude []string `toml:"tagexclude"`
TagInclude []string `toml:"taginclude"`
MetricPass string `toml:"metricpass"`
NamePass []string `toml:"namepass,omitempty"`
NameDrop []string `toml:"namedrop,omitempty"`
FieldPassOld []string `toml:"pass,omitempty"`
FieldPass []string `toml:"fieldpass,omitempty"`
FieldDropOld []string `toml:"drop,omitempty"`
FieldDrop []string `toml:"fielddrop,omitempty"`
TagPassFilters map[string][]string `toml:"tagpass,omitempty"`
TagDropFilters map[string][]string `toml:"tagdrop,omitempty"`
TagExclude []string `toml:"tagexclude,omitempty"`
TagInclude []string `toml:"taginclude,omitempty"`
MetricPass string `toml:"metricpass,omitempty"`
}

View File

@ -1,14 +1,27 @@
package common
type InputOptions struct {
Interval string `toml:"interval"`
Precision string `toml:"precision"`
CollectionJitter string `toml:"collection_jitter"`
CollectionOffset string `toml:"collection_offset"`
NamePrefix string `toml:"name_prefix"`
NameSuffix string `toml:"name_suffix"`
NameOverride string `toml:"name_override"`
Alias string `toml:"alias"`
Tags map[string]string `toml:"tags"`
FilterOptions
// General options
Interval string `toml:"interval,omitempty"`
Precision string `toml:"precision,omitempty"`
CollectionJitter string `toml:"collection_jitter,omitempty"`
CollectionOffset string `toml:"collection_offset,omitempty"`
NamePrefix string `toml:"name_prefix,omitempty"`
NameSuffix string `toml:"name_suffix,omitempty"`
NameOverride string `toml:"name_override,omitempty"`
Alias string `toml:"alias,omitempty"`
Tags map[string]string `toml:"tags,omitempty"`
// Filter options
NamePass []string `toml:"namepass,omitempty"`
NameDrop []string `toml:"namedrop,omitempty"`
FieldPassOld []string `toml:"pass,omitempty"`
FieldPass []string `toml:"fieldpass,omitempty"`
FieldDropOld []string `toml:"drop,omitempty"`
FieldDrop []string `toml:"fielddrop,omitempty"`
TagPassFilters map[string][]string `toml:"tagpass,omitempty"`
TagDropFilters map[string][]string `toml:"tagdrop,omitempty"`
TagExclude []string `toml:"tagexclude,omitempty"`
TagInclude []string `toml:"taginclude,omitempty"`
MetricPass string `toml:"metricpass,omitempty"`
}

View File

@ -58,11 +58,11 @@ type jolokiaAgent struct {
}
// Migration function
func migrate(tbl *ast.Table) ([]byte, error) {
func migrate(tbl *ast.Table) ([]byte, string, error) {
// Decode the old data structure
var old cassandra
if err := toml.UnmarshalTable(tbl, &old); err != nil {
return nil, err
return nil, "", err
}
// Collect servers that use the same credentials
@ -70,10 +70,10 @@ func migrate(tbl *ast.Table) ([]byte, error) {
for _, server := range old.Servers {
u, err := url.Parse("http://" + server)
if err != nil {
return nil, fmt.Errorf("invalid url %q: %w", server, err)
return nil, "", fmt.Errorf("invalid url %q: %w", server, err)
}
if u.Path != "" {
return nil, fmt.Errorf("unexpected path in %q: %w", server, err)
return nil, "", fmt.Errorf("unexpected path in %q: %w", server, err)
}
if u.Hostname() == "" {
u.Host = "localhost:" + u.Port()
@ -110,7 +110,7 @@ func migrate(tbl *ast.Table) ([]byte, error) {
name, found := params["type"]
if !found {
return nil, fmt.Errorf("cannot determine name for metric %q", metric)
return nil, "", fmt.Errorf("cannot determine name for metric %q", metric)
}
name = strings.SplitN(name, "/", 2)[0]
@ -147,7 +147,7 @@ func migrate(tbl *ast.Table) ([]byte, error) {
FieldPrefix: prefix,
})
default:
return nil, fmt.Errorf("unknown java metric %q", metric)
return nil, "", fmt.Errorf("unknown java metric %q", metric)
}
}
@ -182,12 +182,12 @@ func migrate(tbl *ast.Table) ([]byte, error) {
// Marshal the new configuration
buf, err := toml.Marshal(cfg)
if err != nil {
return nil, err
return nil, "", err
}
buf = append(buf, []byte("\n")...)
// Create the new content to output
return buf, nil
return buf, "", nil
}
func (j *jolokiaAgent) fillCommon(o common.InputOptions) {

View File

@ -0,0 +1,142 @@
package inputs_httpjson
import (
"fmt"
"net/url"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
"github.com/influxdata/telegraf/migrations"
"github.com/influxdata/telegraf/migrations/common"
"github.com/influxdata/telegraf/plugins/common/tls"
)
const msg = `
Replacement 'inputs.http' will not report the 'response_time' field and the
'server' tag is replaced by the 'url' tag. Please adapt your queries!
`
// Define "old" data structure
type httpJSON struct {
Name string `toml:"name"`
Servers []string
Method string
TagKeys []string
ResponseTimeout string
Parameters map[string]string
Headers map[string]string
tls.ClientConfig
common.InputOptions
}
// Migration function
func migrate(tbl *ast.Table) ([]byte, string, error) {
// Decode the old data structure
var old httpJSON
if err := toml.UnmarshalTable(tbl, &old); err != nil {
return nil, "", err
}
// Fill common options
plugin := make(map[string]interface{})
general, err := toml.Marshal(old.InputOptions)
if err != nil {
return nil, "", fmt.Errorf("marshalling general options failed: %w", err)
}
if err := toml.Unmarshal(general, &plugin); err != nil {
return nil, "", fmt.Errorf("re-unmarshalling general options failed: %w", err)
}
// Use a map for the new plugin and fill in the data
plugin["urls"] = old.Servers
if old.Name != "" {
if x, found := plugin["name_override"]; found && x != old.Name {
return nil, "", fmt.Errorf("conflicting 'name' (%s) and 'name_override' (%s) setting", old.Name, old.NameOverride)
}
plugin["name_override"] = old.Name
}
if _, found := plugin["name_override"]; !found {
plugin["name_override"] = "httpjson"
}
if old.Method != "" && old.Method != "GET" {
plugin["method"] = old.Method
}
if len(old.TagKeys) > 0 {
plugin["tag_keys"] = old.TagKeys
}
if old.ResponseTimeout != "" {
plugin["timeout"] = old.ResponseTimeout
}
if len(old.Headers) > 0 {
plugin["headers"] = old.Headers
}
if len(old.Parameters) > 0 {
urls := make([]string, 0, len(old.Servers))
for _, s := range old.Servers {
u, err := url.Parse(s)
if err != nil {
return nil, "", fmt.Errorf("parsing server %q failed: %w", s, err)
}
q := u.Query()
for k, v := range old.Parameters {
q.Add(k, v)
}
u.RawQuery = q.Encode()
urls = append(urls, u.String())
}
plugin["urls"] = urls
}
// Convert TLS parameters
if old.TLSCA != "" {
plugin["tls_ca"] = old.TLSCA
}
if old.TLSCert != "" {
plugin["tls_cert"] = old.TLSCert
}
if old.TLSKey != "" {
plugin["tls_key"] = old.TLSKey
}
if old.TLSKeyPwd != "" {
plugin["tls_key_pwd"] = old.TLSKeyPwd
}
if old.TLSMinVersion != "" {
plugin["tls_min_version"] = old.TLSMinVersion
}
if old.InsecureSkipVerify {
plugin["insecure_skip_verify"] = true
}
if old.ServerName != "" {
plugin["tls_server_name"] = old.ServerName
}
if old.RenegotiationMethod != "" {
plugin["tls_renegotiation_method"] = old.RenegotiationMethod
}
if old.Enable != nil {
plugin["tls_enable"] = *old.Enable
}
// Parser settings
plugin["data_format"] = "json"
// Create the corresponding metric configurations
cfg := migrations.CreateTOMLStruct("inputs", "http")
cfg.Add("inputs", "http", plugin)
// Marshal the new configuration
buf, err := toml.Marshal(cfg)
if err != nil {
return nil, "", err
}
buf = append(buf, []byte("\n")...)
// Create the new content to output
return buf, msg, nil
}
// Register the migration function for the plugin type
func init() {
migrations.AddPluginMigration("inputs.httpjson", migrate)
}

View File

@ -0,0 +1,147 @@
package inputs_httpjson_test
import (
"errors"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
_ "github.com/influxdata/telegraf/migrations/inputs_httpjson" // register migration
httpplugin "github.com/influxdata/telegraf/plugins/inputs/http" // register plugin
_ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
inputFile := filepath.Join(testcasePath, "telegraf.conf")
expectedFile := filepath.Join(testcasePath, "expected.conf")
// Read the expected output
expected := config.NewConfig()
require.NoError(t, expected.LoadConfig(expectedFile))
require.NotEmpty(t, expected.Inputs)
// Read the input data
input, remote, err := config.LoadConfigFile(inputFile)
require.NoError(t, err)
require.False(t, remote)
require.NotEmpty(t, input)
// Migrate
output, n, err := config.ApplyMigrations(input)
require.NoError(t, err)
require.NotEmpty(t, output)
require.GreaterOrEqual(t, n, uint64(1))
actual := config.NewConfig()
require.NoError(t, actual.LoadConfigData(output))
// Test the output
require.Len(t, actual.Inputs, len(expected.Inputs))
actualIDs := make([]string, 0, len(expected.Inputs))
expectedIDs := make([]string, 0, len(expected.Inputs))
for i := range actual.Inputs {
actualIDs = append(actualIDs, actual.Inputs[i].ID())
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
}
require.ElementsMatchf(t, expectedIDs, actualIDs, "generated config: %s", string(output))
})
}
}
func TestParsing(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
testcasePath := filepath.Join("testcases", f.Name())
configFile := filepath.Join(testcasePath, "expected.conf")
inputFile := filepath.Join(testcasePath, "input.json")
expectedFile := filepath.Join(testcasePath, "output.influx")
// Skip the testcase if it doesn't provide data
if _, err := os.Stat(inputFile); errors.Is(err, os.ErrNotExist) {
continue
}
t.Run(f.Name(), func(t *testing.T) {
parser := &influx.Parser{}
require.NoError(t, parser.Init())
// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(configFile))
require.Len(t, cfg.Inputs, 1)
plugin := cfg.Inputs[0].Input.(*httpplugin.HTTP)
// Read the input data
input, err := os.ReadFile(inputFile)
require.NoError(t, err)
require.NotEmpty(t, input)
// Read the expected output
expected, err := testutil.ParseMetricsFromFile(expectedFile, parser)
require.NoError(t, err)
require.NotEmpty(t, expected)
// Start the test-server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/stats" {
_, _ = w.Write(input)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
// Point the plugin to the test-server and start the game
addr := server.URL + "/stats"
plugin.URLs = []string{addr}
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
// Prepare metrics for comparison
for i := range expected {
expected[i].AddTag("url", addr)
}
raw := acc.GetTelegrafMetrics()
actual := make([]telegraf.Metric, 0, len(raw))
for _, m := range raw {
actual = append(actual, cfg.Inputs[0].MakeMetric(m))
}
// Compare
options := []cmp.Option{
testutil.IgnoreTime(),
testutil.IgnoreTags("host"),
}
testutil.RequireMetricsEqual(t, expected, actual, options...)
})
}
}

View File

@ -0,0 +1,5 @@
[[inputs.http]]
name_override = "httpjson"
urls = ["http://localhost:9999/stats/"]
data_format = "json"
tag_keys = ["service"]

View File

@ -0,0 +1,20 @@
[
{
"service": "service01",
"a": 0.5,
"b": {
"c": "some text",
"d": 0.1,
"e": 5
}
},
{
"service": "service02",
"a": 0.6,
"b": {
"c": "some text",
"d": 0.2,
"e": 6
}
}
]

View File

@ -0,0 +1,2 @@
httpjson,url=http://localhost:9999/stats/,service=service01 a=0.5,b_d=0.1,b_e=5
httpjson,url=http://localhost:9999/stats/,service=service02 a=0.6,b_d=0.2,b_e=6

View File

@ -0,0 +1,3 @@
[[inputs.httpjson]]
servers = ["http://localhost:9999/stats/"]
tag_keys = ["service"]

View File

@ -0,0 +1,8 @@
[[inputs.http]]
name_override = "httpjson"
method = "POST"
urls = ["http://localhost:9999/stats"]
data_format = "json"
[inputs.http.headers]
X-Auth-Token = "my-xauth-token"
apiVersion = "v1"

View File

@ -0,0 +1,6 @@
[[inputs.httpjson]]
servers = ["http://localhost:9999/stats"]
method = "POST"
[inputs.httpjson.headers]
X-Auth-Token = "my-xauth-token"
apiVersion = "v1"

View File

@ -0,0 +1,4 @@
[[inputs.http]]
name_override = "httpjson"
urls = ["http://localhost:9999/stats?event_type=cpu_spike&threshold=0.75"]
data_format = "json"

View File

@ -0,0 +1,8 @@
[[inputs.httpjson]]
servers = ["http://localhost:9999/stats"]
# HTTP Request Parameters (all values must be strings). For "GET" requests, data
# will be included in the query. For "POST" requests, data will be included
# in the request body as "x-www-form-urlencoded".
[inputs.httpjson.parameters]
event_type = "cpu_spike"
threshold = "0.75"

View File

@ -0,0 +1,5 @@
[[inputs.http]]
name_override = "webserver_stats"
timeout = "5s"
urls = ["http://localhost:9999/stats/", "http://localhost:9998/stats/"]
data_format = "json"

View File

@ -0,0 +1,9 @@
{
"a": 0.5,
"b": {
"c": "some text",
"d": 0.1,
"e": 5
},
"service": "service01"
}

View File

@ -0,0 +1 @@
webserver_stats,url=http://localhost:9999/stats/ b_d=0.1,a=0.5,b_e=5

View File

@ -0,0 +1,46 @@
# Read flattened metrics from one or more JSON HTTP endpoints
[[inputs.httpjson]]
## NOTE This plugin only reads numerical measurements, strings and booleans
## will be ignored.
## Name for the service being polled. Will be appended to the name of the
## measurement e.g. "httpjson_webserver_stats".
##
## Deprecated (1.3.0): Use name_override, name_suffix, name_prefix instead.
name = "webserver_stats"
## URL of each server in the service's cluster
servers = [
"http://localhost:9999/stats/",
"http://localhost:9998/stats/",
]
## Set response_timeout (default 5 seconds)
response_timeout = "5s"
## HTTP method to use: GET or POST (case-sensitive)
method = "GET"
## Tags to extract from top-level of JSON server response.
# tag_keys = [
# "my_tag_1",
# "my_tag_2"
# ]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## HTTP Request Parameters (all values must be strings). For "GET" requests, data
## will be included in the query. For "POST" requests, data will be included
## in the request body as "x-www-form-urlencoded".
# [inputs.httpjson.parameters]
# event_type = "cpu_spike"
# threshold = "0.75"
## HTTP Request Headers (all values must be strings).
# [inputs.httpjson.headers]
# X-Auth-Token = "my-xauth-token"
# apiVersion = "v1"

View File

@ -6,7 +6,7 @@ import (
"github.com/influxdata/toml/ast"
)
type PluginMigrationFunc func(*ast.Table) ([]byte, error)
type PluginMigrationFunc func(*ast.Table) ([]byte, string, error)
var PluginMigrations = make(map[string]PluginMigrationFunc)

View File

@ -143,6 +143,21 @@ func IgnoreFields(names ...string) cmp.Option {
)
}
// return disables comparison of the tags with the given names.
// The tag-names are case-sensitive!
func IgnoreTags(names ...string) cmp.Option {
return cmpopts.IgnoreSliceElements(
func(f *telegraf.Tag) bool {
for _, n := range names {
if f.Key == n {
return true
}
}
return false
},
)
}
// MetricEqual returns true if the metrics are equal.
func MetricEqual(expected, actual telegraf.Metric, opts ...cmp.Option) bool {
var lhs, rhs *metricDiff