diff --git a/migrations/all/inputs_mqtt_consumer.go b/migrations/all/inputs_mqtt_consumer.go new file mode 100644 index 000000000..b5ed6db51 --- /dev/null +++ b/migrations/all/inputs_mqtt_consumer.go @@ -0,0 +1,5 @@ +//go:build !custom || (migrations && (inputs || inputs.mqtt_consumer)) + +package all + +import _ "github.com/influxdata/telegraf/migrations/inputs_mqtt_consumer" // register migration diff --git a/migrations/inputs_mqtt_consumer/migration.go b/migrations/inputs_mqtt_consumer/migration.go new file mode 100644 index 000000000..9137af590 --- /dev/null +++ b/migrations/inputs_mqtt_consumer/migration.go @@ -0,0 +1,43 @@ +package inputs_mqtt_consumer + +import ( + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + + "github.com/influxdata/telegraf/migrations" +) + +// Migration function +func migrate(tbl *ast.Table) ([]byte, string, error) { + // Decode the old data structure + var plugin map[string]interface{} + if err := toml.UnmarshalTable(tbl, &plugin); err != nil { + return nil, "", err + } + + // Check for deprecated option(s) and migrate them + var applied bool + if _, found := plugin["metric_buffer"]; found { + applied = true + + // Remove the ignored setting + delete(plugin, "metric_buffer") + } + + // No options migrated so we can exit early + if !applied { + return nil, "", migrations.ErrNotApplicable + } + + // Create the corresponding plugin configurations + cfg := migrations.CreateTOMLStruct("inputs", "mqtt_consumer") + cfg.Add("inputs", "mqtt_consumer", plugin) + + output, err := toml.Marshal(cfg) + return output, "", err +} + +// Register the migration function for the plugin type +func init() { + migrations.AddPluginOptionMigration("inputs.mqtt_consumer", migrate) +} diff --git a/migrations/inputs_mqtt_consumer/migration_test.go b/migrations/inputs_mqtt_consumer/migration_test.go new file mode 100644 index 000000000..a1494e3cf --- /dev/null +++ b/migrations/inputs_mqtt_consumer/migration_test.go @@ -0,0 +1,161 @@ +package inputs_mqtt_consumer_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + _ "github.com/influxdata/telegraf/migrations/inputs_mqtt_consumer" // register migration + _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" // register plugin + _ "github.com/influxdata/telegraf/plugins/parsers/all" // register parsers +) + +func TestNoMigration(t *testing.T) { + defaultCfg := []byte(` + # Read metrics from MQTT topic(s) + [[inputs.mqtt_consumer]] + ## Broker URLs for the MQTT server or cluster. To connect to multiple + ## clusters or standalone servers, use a separate plugin instance. + ## example: servers = ["tcp://localhost:1883"] + ## servers = ["ssl://localhost:1883"] + ## servers = ["ws://localhost:1883"] + servers = ["tcp://127.0.0.1:1883"] + + ## Topics that will be subscribed to. + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + + ## The message topic will be stored in a tag specified by this value. If set + ## to the empty string no topic tag will be created. + # topic_tag = "topic" + + ## QoS policy for messages + ## 0 = at most once + ## 1 = at least once + ## 2 = exactly once + ## + ## When using a QoS of 1 or 2, you should enable persistent_session to allow + ## resuming unacknowledged messages. + # qos = 0 + + ## Connection timeout for initial connection in seconds + # connection_timeout = "30s" + + ## Max undelivered messages + ## This plugin uses tracking metrics, which ensure messages are read to + ## outputs before acknowledging them to the original broker to ensure data + ## is not lost. This option sets the maximum messages to read from the + ## broker that have not been written by an output. + ## + ## This value needs to be picked with awareness of the agent's + ## metric_batch_size value as well. Setting max undelivered messages too high + ## can result in a constant stream of data batches to the output. While + ## setting it too low may never flush the broker's messages. + # max_undelivered_messages = 1000 + + ## Persistent session disables clearing of the client session on connection. + ## In order for this option to work you must also set client_id to identify + ## the client. To receive messages that arrived while the client is offline, + ## also set the qos option to 1 or 2 and don't forget to also set the QoS when + ## publishing. + # persistent_session = false + + ## If unset, a random client ID will be generated. + # client_id = "" + + ## Username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Client trace messages + ## When set to true, and debug mode enabled in the agent settings, the MQTT + ## client's messages are included in telegraf logs. These messages are very + ## noisey, but essential for debugging issues. + # client_trace = false + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Enable extracting tag values from MQTT topics + ## _ denotes an ignored entry in the topic path + # [[inputs.mqtt_consumer.topic_parsing]] + # topic = "" + # measurement = "" + # tags = "" + # fields = "" + ## Value supported is int, float, unit + # [[inputs.mqtt_consumer.topic.types]] + # key = type +`) + + // Migrate and check that nothing changed + output, n, err := config.ApplyMigrations(defaultCfg) + require.NoError(t, err) + require.NotEmpty(t, output) + require.Zero(t, n) + require.Equal(t, string(defaultCfg), string(output)) +} + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + inputFile := filepath.Join(testcasePath, "telegraf.conf") + expectedFile := filepath.Join(testcasePath, "expected.conf") + + // Read the expected output + expected := config.NewConfig() + require.NoError(t, expected.LoadConfig(expectedFile)) + require.NotEmpty(t, expected.Inputs) + + // Read the input data + input, remote, err := config.LoadConfigFile(inputFile) + require.NoError(t, err) + require.False(t, remote) + require.NotEmpty(t, input) + + // Migrate + output, n, err := config.ApplyMigrations(input) + require.NoError(t, err) + require.NotEmpty(t, output) + require.GreaterOrEqual(t, n, uint64(1)) + actual := config.NewConfig() + require.NoError(t, actual.LoadConfigData(output)) + + // Test the output + require.Len(t, actual.Inputs, len(expected.Inputs)) + actualIDs := make([]string, 0, len(expected.Inputs)) + expectedIDs := make([]string, 0, len(expected.Inputs)) + for i := range actual.Inputs { + actualIDs = append(actualIDs, actual.Inputs[i].ID()) + expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) + } + require.ElementsMatch(t, expectedIDs, actualIDs, string(output)) + }) + } +} diff --git a/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer/expected.conf b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer/expected.conf new file mode 100644 index 000000000..558bffa51 --- /dev/null +++ b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer/expected.conf @@ -0,0 +1,4 @@ +[[inputs.mqtt_consumer]] +data_format = "influx" +servers = ["tcp://127.0.0.1:1883"] +topics = ["telegraf/host01/cpu", "telegraf/+/mem", "sensors/#"] diff --git a/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer/telegraf.conf b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer/telegraf.conf new file mode 100644 index 000000000..8bf21f8b7 --- /dev/null +++ b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer/telegraf.conf @@ -0,0 +1,10 @@ +# Read metrics from MQTT topic(s) +[[inputs.mqtt_consumer]] + servers = ["tcp://127.0.0.1:1883"] + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + metric_buffer = 1024 + data_format = "influx" diff --git a/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer_parser/expected.conf b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer_parser/expected.conf new file mode 100644 index 000000000..bee249b3f --- /dev/null +++ b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer_parser/expected.conf @@ -0,0 +1,12 @@ +[[inputs.mqtt_consumer]] +data_format = "xpath_json" +servers = ["tcp://127.0.0.1:1883"] +topics = ["telegraf/host01/cpu", "telegraf/+/mem", "sensors/#"] +xpath_native_types = true + +[[inputs.mqtt_consumer.xpath]] +field_selection = "/fields/*" +metric_name = "/name" +tag_selection = "/tags/*" +timestamp = "/timestamp" +timestamp_format = "unix_ms" \ No newline at end of file diff --git a/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf new file mode 100644 index 000000000..1066e0097 --- /dev/null +++ b/migrations/inputs_mqtt_consumer/testcases/deprecated_metric_buffer_parser/telegraf.conf @@ -0,0 +1,20 @@ +# Read metrics from MQTT topic(s) +[[inputs.mqtt_consumer]] + servers = ["tcp://127.0.0.1:1883"] + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + metric_buffer = 1024 + + data_format = "xpath_json" + xpath_native_types = true + + # Configuration matching the first (ENERGY) message + [[inputs.mqtt_consumer.xpath]] + metric_name = "/name" + timestamp = "/timestamp" + timestamp_format = "unix_ms" + field_selection = "/fields/*" + tag_selection = "/tags/*" \ No newline at end of file