feat(migrations): Add migration for inputs.udp_listener (#14120)
This commit is contained in:
parent
2e89bb5c2c
commit
8583f22528
|
|
@ -0,0 +1,5 @@
|
|||
//go:build !custom || (migrations && (inputs || inputs.udp_listener))
|
||||
|
||||
package all
|
||||
|
||||
import _ "github.com/influxdata/telegraf/migrations/inputs_udp_listener" // register migration
|
||||
|
|
@ -0,0 +1,73 @@
|
|||
package inputs_udp_listener
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/toml"
|
||||
"github.com/influxdata/toml/ast"
|
||||
|
||||
"github.com/influxdata/telegraf/migrations"
|
||||
)
|
||||
|
||||
const allowPendingMessagesMsg = `
|
||||
Replacement 'inputs.socket_listener' does not allow to configure
|
||||
'allowed_pending_messages' and thus the setting will be dropped.
|
||||
`
|
||||
|
||||
const udpPacketSizeMsg = `
|
||||
The deprecated 'udp_buffer_size' setting will be dropped.
|
||||
`
|
||||
|
||||
// Define "old" data structure
|
||||
type udpListener map[string]interface{}
|
||||
|
||||
// Migration function
|
||||
func migrate(tbl *ast.Table) ([]byte, string, error) {
|
||||
// Decode the old data structure
|
||||
var old udpListener
|
||||
if err := toml.UnmarshalTable(tbl, &old); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
// Copy the setting except the special plugin ones to preserve
|
||||
// all parser settings of the existing (deprecated) config.
|
||||
var msg string
|
||||
plugin := make(map[string]interface{}, len(old))
|
||||
for k, v := range old {
|
||||
switch k {
|
||||
case "service_address":
|
||||
addr, ok := v.(string)
|
||||
if !ok {
|
||||
return nil, "", fmt.Errorf("service_address is not a string but %T", v)
|
||||
}
|
||||
plugin["service_address"] = "udp://" + addr
|
||||
case "allowed_pending_messages":
|
||||
msg += allowPendingMessagesMsg
|
||||
case "udp_packet_size":
|
||||
msg += udpPacketSizeMsg
|
||||
case "udp_buffer_size":
|
||||
plugin["read_buffer_size"] = v
|
||||
default:
|
||||
plugin[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Create the corresponding metric configurations
|
||||
cfg := migrations.CreateTOMLStruct("inputs", "socket_listener")
|
||||
cfg.Add("inputs", "socket_listener", plugin)
|
||||
|
||||
// Marshal the new configuration
|
||||
buf, err := toml.Marshal(cfg)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
buf = append(buf, []byte("\n")...)
|
||||
|
||||
// Create the new content to output
|
||||
return buf, msg, nil
|
||||
}
|
||||
|
||||
// Register the migration function for the plugin type
|
||||
func init() {
|
||||
migrations.AddPluginMigration("inputs.udp_listener", migrate)
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
package inputs_udp_listener_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
_ "github.com/influxdata/telegraf/migrations/inputs_udp_listener" // register migration
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener" // register plugin
|
||||
_ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers
|
||||
)
|
||||
|
||||
func TestCases(t *testing.T) {
|
||||
// Get all directories in testdata
|
||||
folders, err := os.ReadDir("testcases")
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, f := range folders {
|
||||
// Only handle folders
|
||||
if !f.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
t.Run(f.Name(), func(t *testing.T) {
|
||||
testcasePath := filepath.Join("testcases", f.Name())
|
||||
inputFile := filepath.Join(testcasePath, "telegraf.conf")
|
||||
expectedFile := filepath.Join(testcasePath, "expected.conf")
|
||||
|
||||
// Read the expected output
|
||||
expected := config.NewConfig()
|
||||
require.NoError(t, expected.LoadConfig(expectedFile))
|
||||
require.NotEmpty(t, expected.Inputs)
|
||||
|
||||
// Read the input data
|
||||
input, remote, err := config.LoadConfigFile(inputFile)
|
||||
require.NoError(t, err)
|
||||
require.False(t, remote)
|
||||
require.NotEmpty(t, input)
|
||||
|
||||
// Migrate
|
||||
output, n, err := config.ApplyMigrations(input)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, output)
|
||||
require.GreaterOrEqual(t, n, uint64(1))
|
||||
actual := config.NewConfig()
|
||||
require.NoError(t, actual.LoadConfigData(output))
|
||||
|
||||
// Test the output
|
||||
require.Len(t, actual.Inputs, len(expected.Inputs))
|
||||
actualIDs := make([]string, 0, len(expected.Inputs))
|
||||
expectedIDs := make([]string, 0, len(expected.Inputs))
|
||||
for i := range actual.Inputs {
|
||||
actualIDs = append(actualIDs, actual.Inputs[i].ID())
|
||||
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
|
||||
}
|
||||
require.ElementsMatch(t, expectedIDs, actualIDs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
[[inputs.socket_listener]]
|
||||
service_address = "udp://127.0.0.1:8000"
|
||||
data_format = "influx"
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
[[inputs.udp_listener]]
|
||||
service_address = "127.0.0.1:8000"
|
||||
allowed_pending_messages = 1000
|
||||
udp_packet_size = 1024
|
||||
data_format = "influx"
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
[[inputs.socket_listener]]
|
||||
service_address = "udp://127.0.0.1:8000"
|
||||
data_format = "influx"
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
[[inputs.udp_listener]]
|
||||
service_address = "127.0.0.1:8000"
|
||||
allowed_pending_messages = 1000
|
||||
|
||||
data_format = "influx"
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
[[inputs.socket_listener]]
|
||||
service_address = "udp://127.0.0.1:8000"
|
||||
data_format = "xpath_json"
|
||||
xpath_native_types = true
|
||||
[[inputs.socket_listener.xpath]]
|
||||
metric_name = "/name"
|
||||
timestamp = "/timestamp"
|
||||
timestamp_format = "unix_ms"
|
||||
field_selection = "/fields/*"
|
||||
tag_selection = "/tags/*"
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
[[inputs.udp_listener]]
|
||||
service_address = "127.0.0.1:8000"
|
||||
|
||||
data_format = "xpath_json"
|
||||
xpath_native_types = true
|
||||
|
||||
# Configuration matching the first (ENERGY) message
|
||||
[[inputs.udp_listener.xpath]]
|
||||
metric_name = "/name"
|
||||
timestamp = "/timestamp"
|
||||
timestamp_format = "unix_ms"
|
||||
field_selection = "/fields/*"
|
||||
tag_selection = "/tags/*"
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
[[inputs.socket_listener]]
|
||||
service_address = "udp://127.0.0.1:8000"
|
||||
data_format = "influx"
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
[[inputs.udp_listener]]
|
||||
service_address = "127.0.0.1:8000"
|
||||
|
||||
data_format = "influx"
|
||||
Loading…
Reference in New Issue