feat(migrations): Add migration for fieldpass/fielddrop (#14401)

This commit is contained in:
Sven Rebhan 2023-12-07 20:31:30 +01:00 committed by GitHub
parent 5d598321bb
commit 2a81343ad3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 424 additions and 67 deletions

View File

@ -200,6 +200,31 @@ func ApplyMigrations(data []byte) ([]byte, uint64, error) {
applied++
}
// Do general migrations applying to all plugins
for idx, s := range sections {
parts := strings.Split(s.name, ".")
if len(parts) != 2 {
continue
}
log.Printf("D! applying general migrations to plugin %q in line %d...", s.name, s.begin)
category, name := parts[0], parts[1]
for _, migrate := range migrations.GeneralMigrations {
result, msg, err := migrate(category, name, s.content)
if err != nil {
if errors.Is(err, migrations.ErrNotApplicable) {
continue
}
return nil, 0, fmt.Errorf("migrating options of %q (line %d) failed: %w", s.name, s.begin, err)
}
if msg != "" {
log.Printf("I! Plugin %q in line %d: %s", s.name, s.begin, msg)
}
s.raw = bytes.NewBuffer(result)
applied++
}
sections[idx] = s
}
// Reconstruct the config file from the sections
var buf bytes.Buffer
for _, s := range sections {

View File

@ -0,0 +1,5 @@
//go:build !custom || migrations
package all
import _ "github.com/influxdata/telegraf/migrations/general_metricfilter" // register migration

View File

@ -0,0 +1,112 @@
package general_metricfilter
import (
"fmt"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/migrations"
)
// Migration function
func migrate(category, name string, tbl *ast.Table) ([]byte, string, error) {
// Filter options can only be present in inputs, outputs, processors and
// aggregators. Skip everything else...
switch category {
case "inputs", "outputs", "processors", "aggregators":
default:
return nil, "", migrations.ErrNotApplicable
}
// Decode the old data structure
var plugin map[string]interface{}
if err := toml.UnmarshalTable(tbl, &plugin); err != nil {
return nil, "", err
}
// Check for deprecated option(s) and migrate them
var applied bool
// Get the new field settings to be able to merge it with the deprecated
// settings
var fieldinclude []string
if newFieldInclude, found := plugin["fieldinclude"]; found {
var err error
fieldinclude, err = migrations.AsStringSlice(newFieldInclude)
if err != nil {
return nil, "", fmt.Errorf("setting 'fieldinclude': %w", err)
}
}
for _, option := range []string{"pass", "fieldpass"} {
if rawOld, found := plugin[option]; found {
applied = true
old, err := migrations.AsStringSlice(rawOld)
if err != nil {
return nil, "", fmt.Errorf("setting '%s': %w", option, err)
}
for _, o := range old {
if !choice.Contains(o, fieldinclude) {
fieldinclude = append(fieldinclude, o)
}
}
// Remove the deprecated setting
delete(plugin, option)
}
}
// Add the new option if it has data
if len(fieldinclude) > 0 {
plugin["fieldinclude"] = fieldinclude
}
var fieldexclude []string
if newFieldExclude, found := plugin["fieldexclude"]; found {
var err error
fieldexclude, err = migrations.AsStringSlice(newFieldExclude)
if err != nil {
return nil, "", fmt.Errorf("setting 'fieldexclude': %w", err)
}
}
for _, option := range []string{"drop", "fielddrop"} {
if rawOld, found := plugin[option]; found {
applied = true
old, err := migrations.AsStringSlice(rawOld)
if err != nil {
return nil, "", fmt.Errorf("setting '%s': %w", option, err)
}
for _, o := range old {
if !choice.Contains(o, fieldexclude) {
fieldexclude = append(fieldexclude, o)
}
}
// Remove the deprecated setting
delete(plugin, option)
}
}
// Add the new option if it has data
if len(fieldexclude) > 0 {
plugin["fieldexclude"] = fieldexclude
}
// No options migrated so we can exit early
if !applied {
return nil, "", migrations.ErrNotApplicable
}
// Create the corresponding plugin configurations
cfg := migrations.CreateTOMLStruct(category, name)
cfg.Add(category, name, plugin)
output, err := toml.Marshal(cfg)
return output, "", err
}
// Register the migration function for the plugin type
func init() {
migrations.AddGeneralMigration(migrate)
}

View File

@ -0,0 +1,96 @@
package general_metricfilter_test
import (
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
_ "github.com/influxdata/telegraf/migrations/general_metricfilter" // register migration
"github.com/influxdata/telegraf/plugins/inputs"
)
func TestNoMigration(t *testing.T) {
cfg := []byte(`
# Dummy plugin
[[inputs.dummy]]
## A dummy server
servers = ["tcp://127.0.0.1:1883"]
## A commented option
# timeout = "10s"
`)
// Migrate and check that nothing changed
output, n, err := config.ApplyMigrations(cfg)
require.NoError(t, err)
require.NotEmpty(t, output)
require.Zero(t, n)
require.Equal(t, string(cfg), string(output))
}
func TestCases(t *testing.T) {
// Get all directories in testdata
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
inputs.Add("dummy", func() telegraf.Input { return &MockupInputPlugin{} })
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
inputFile := filepath.Join(testcasePath, "telegraf.conf")
expectedFile := filepath.Join(testcasePath, "expected.conf")
// Read the expected output
expected := config.NewConfig()
require.NoError(t, expected.LoadConfig(expectedFile))
require.NotEmpty(t, expected.Inputs)
// Read the input data
input, remote, err := config.LoadConfigFile(inputFile)
require.NoError(t, err)
require.False(t, remote)
require.NotEmpty(t, input)
// Migrate
output, n, err := config.ApplyMigrations(input)
require.NoError(t, err)
require.NotEmpty(t, output)
require.GreaterOrEqual(t, n, uint64(1))
actual := config.NewConfig()
require.NoError(t, actual.LoadConfigData(output))
// Test the output
require.Len(t, actual.Inputs, len(expected.Inputs))
actualIDs := make([]string, 0, len(expected.Inputs))
expectedIDs := make([]string, 0, len(expected.Inputs))
for i := range actual.Inputs {
actualIDs = append(actualIDs, actual.Inputs[i].ID())
expectedIDs = append(expectedIDs, expected.Inputs[i].ID())
}
require.ElementsMatch(t, expectedIDs, actualIDs, string(output))
})
}
}
// Implement a mock input plugin for testing
type MockupInputPlugin struct {
Servers []string `toml:"servers"`
Timeout config.Duration `toml:"timeout"`
}
func (m *MockupInputPlugin) SampleConfig() string {
return "Mockup test input plugin"
}
func (m *MockupInputPlugin) Gather(_ telegraf.Accumulator) error {
return nil
}

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldexclude = ["value"]

View File

@ -0,0 +1,10 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated drop
drop = ["value"]

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldexclude = ["value"]

View File

@ -0,0 +1,10 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated fielddrop
fielddrop = ["value"]

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldinclude = ["value"]

View File

@ -0,0 +1,10 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated fieldpass
fieldpass = ["value"]

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldinclude = ["value"]

View File

@ -0,0 +1,10 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated pass
pass = ["value"]

View File

@ -0,0 +1,4 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldexclude = ["bugA", "bugX", "bugB", "bugY", "bugC"]
fieldinclude = ["valueA", "valueX", "valueB", "valueY", "valueC"]

View File

@ -0,0 +1,15 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated field options
fieldinclude = ["valueA", "valueX"]
fieldexclude = ["bugA", "bugX"]
drop = ["bugB", "bugX", "bugY"]
pass = ["valueB", "valueX", "valueY"]
fieldpass = ["valueY", "valueC", "valueX"]
fielddrop = ["bugY", "bugC", "bugX"]

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldexclude = ["valueA", "valueB", "valueC"]

View File

@ -0,0 +1,12 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated field-exclude options
fieldexclude = ["valueA"]
drop = ["valueB"]
fielddrop = ["valueC"]

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldexclude = ["valueA", "valueX", "valueB", "valueY", "valueC"]

View File

@ -0,0 +1,12 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated field-exclude options
fieldexclude = ["valueA", "valueX"]
drop = ["valueB", "valueX", "valueY"]
fielddrop = ["valueY", "valueC", "valueX"]

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldinclude = ["valueA", "valueB", "valueC"]

View File

@ -0,0 +1,12 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated field-include options
fieldinclude = ["valueA"]
pass = ["valueB"]
fieldpass = ["valueC"]

View File

@ -0,0 +1,3 @@
[[inputs.dummy]]
servers = ["tcp://127.0.0.1:1883"]
fieldinclude = ["valueA", "valueX", "valueB", "valueY", "valueC"]

View File

@ -0,0 +1,12 @@
# A dummy plugin
[[inputs.dummy]]
## A server
servers = ["tcp://127.0.0.1:1883"]
## Default timestamp
# timestamp = "10s"
## Deprecated field-include options
fieldinclude = ["valueA", "valueX"]
pass = ["valueB", "valueX", "valueY"]
fieldpass = ["valueY", "valueC", "valueX"]

View File

@ -24,36 +24,20 @@ func migrate(tbl *ast.Table) ([]byte, string, error) {
applied = true
// Convert the options to the actual type
deprecatedMountpoints, ok := rawDeprecatedMountpoints.([]interface{})
if !ok {
err := fmt.Errorf("unexpected type for deprecated 'mountpoints' option: %T", rawDeprecatedMountpoints)
return nil, "", err
deprecatedMountpoints, err := migrations.AsStringSlice(rawDeprecatedMountpoints)
if err != nil {
return nil, "", fmt.Errorf("'mountpoints' option: %w", err)
}
// Merge the option with the replacement
var mountpoints []string
if rawMountpoints, found := plugin["mount_points"]; found {
mountpointsList, ok := rawMountpoints.([]interface{})
if !ok {
err := fmt.Errorf("unexpected type for 'mount_points' option: %T", rawMountpoints)
return nil, "", err
}
for _, raw := range mountpointsList {
mp, ok := raw.(string)
if !ok {
err := fmt.Errorf("unexpected type for 'mount_points' option: %T", raw)
return nil, "", err
}
mountpoints = append(mountpoints, mp)
mountpoints, err = migrations.AsStringSlice(rawMountpoints)
if err != nil {
return nil, "", fmt.Errorf("'mount_points' option: %w", err)
}
}
for _, raw := range deprecatedMountpoints {
dmp, ok := raw.(string)
if !ok {
err := fmt.Errorf("unexpected type for deprecated 'mountpoints' option: %T", raw)
return nil, "", err
}
for _, dmp := range deprecatedMountpoints {
if !choice.Contains(dmp, mountpoints) {
mountpoints = append(mountpoints, dmp)
}

View File

@ -20,33 +20,23 @@ func migrate(tbl *ast.Table) ([]byte, string, error) {
// Check for deprecated option(s) and migrate them
var applied bool
if oldUnits, found := plugin["supervisor_unit"]; found {
if rawOldUnits, found := plugin["supervisor_unit"]; found {
applied = true
// Check if the new option already exists and merge the two
var units []string
if newUnits, found := plugin["supervisor_units"]; found {
nu, ok := newUnits.([]interface{})
if !ok {
return nil, "", fmt.Errorf("setting 'supervisor_units' has wrong type %T", newUnits)
}
for _, raw := range nu {
u, ok := raw.(string)
if !ok {
return nil, "", fmt.Errorf("setting 'supervisor_units' contains wrong type %T", raw)
}
units = append(units, u)
var err error
units, err = migrations.AsStringSlice(newUnits)
if err != nil {
return nil, "", fmt.Errorf("setting 'supervisor_units': %w", err)
}
}
ou, ok := oldUnits.([]interface{})
if !ok {
return nil, "", fmt.Errorf("setting 'supervisor_unit' has wrong type %T", oldUnits)
}
for _, raw := range ou {
u, ok := raw.(string)
if !ok {
return nil, "", fmt.Errorf("setting 'supervisor_unit' contains wrong type %T", raw)
oldUnits, err := migrations.AsStringSlice(rawOldUnits)
if err != nil {
return nil, "", fmt.Errorf("setting 'supervisor_unit': %w", err)
}
for _, u := range oldUnits {
if !choice.Contains(u, units) {
units = append(units, u)
}
@ -59,20 +49,11 @@ func migrate(tbl *ast.Table) ([]byte, string, error) {
// The tagging options both need the 'tag_with' setting
var tagwith []string
newTagWith, found := plugin["tag_with"]
if found {
ntw, ok := newTagWith.([]interface{})
if !ok {
return nil, "", fmt.Errorf("setting 'tag_with' has wrong type %T", newTagWith)
}
for _, raw := range ntw {
s, ok := raw.(string)
if !ok {
return nil, "", fmt.Errorf("setting 'tag_with' contains wrong type %T", raw)
}
if !choice.Contains(s, tagwith) {
tagwith = append(tagwith, s)
}
if rawNewTagWith, found := plugin["tag_with"]; found {
var err error
tagwith, err = migrations.AsStringSlice(rawNewTagWith)
if err != nil {
return nil, "", fmt.Errorf("setting 'tag_with': %w", err)
}
}

View File

@ -1,7 +1,6 @@
package outputs_influxdb
import (
"errors"
"fmt"
"github.com/influxdata/toml"
@ -27,16 +26,10 @@ func migrate(tbl *ast.Table) ([]byte, string, error) {
var urls []string
// Merge the old URL and the new URLs with deduplication
if newURLs, found := plugin["urls"]; found {
list, ok := newURLs.([]interface{})
if !ok {
return nil, "", errors.New("'urls' setting is not a list")
}
for _, raw := range list {
nu, ok := raw.(string)
if !ok {
return nil, "", fmt.Errorf("unexpected 'urls' entry %v (%T)", raw, raw)
}
urls = append(urls, nu)
var err error
urls, err = migrations.AsStringSlice(newURLs)
if err != nil {
return nil, "", fmt.Errorf("'urls' setting: %w", err)
}
}
ou, ok := oldURL.(string)

View File

@ -31,6 +31,14 @@ func AddPluginOptionMigration(name string, f PluginOptionMigrationFunc) {
PluginOptionMigrations[name] = f
}
type GeneralMigrationFunc func(string, string, *ast.Table) ([]byte, string, error)
var GeneralMigrations []GeneralMigrationFunc
func AddGeneralMigration(f GeneralMigrationFunc) {
GeneralMigrations = append(GeneralMigrations, f)
}
type pluginTOMLStruct map[string]map[string][]interface{}
func CreateTOMLStruct(category, name string) pluginTOMLStruct {

22
migrations/utils.go Normal file
View File

@ -0,0 +1,22 @@
package migrations
import (
"fmt"
)
func AsStringSlice(raw interface{}) ([]string, error) {
rawList, ok := raw.([]interface{})
if !ok {
return nil, fmt.Errorf("unexpected type : %T", raw)
}
converted := make([]string, 0, len(rawList))
for _, rawElement := range rawList {
el, ok := rawElement.(string)
if !ok {
return nil, fmt.Errorf("unexpected type for list element: %T", rawElement)
}
converted = append(converted, el)
}
return converted, nil
}