diff --git a/migrations/all/inputs_procstat.go b/migrations/all/inputs_procstat.go new file mode 100644 index 000000000..29dbdc573 --- /dev/null +++ b/migrations/all/inputs_procstat.go @@ -0,0 +1,5 @@ +//go:build !custom || (migrations && (inputs || inputs.procstat)) + +package all + +import _ "github.com/influxdata/telegraf/migrations/inputs_procstat" // register migration diff --git a/migrations/inputs_procstat/migration.go b/migrations/inputs_procstat/migration.go new file mode 100644 index 000000000..0826b8803 --- /dev/null +++ b/migrations/inputs_procstat/migration.go @@ -0,0 +1,76 @@ +package inputs_procstat + +import ( + "fmt" + + "github.com/influxdata/toml" + "github.com/influxdata/toml/ast" + + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/migrations" +) + +// Migration function +func migrate(tbl *ast.Table) ([]byte, string, error) { + // Decode the old data structure + var plugin map[string]interface{} + if err := toml.UnmarshalTable(tbl, &plugin); err != nil { + return nil, "", err + } + + // Check for deprecated option(s) and migrate them + var applied bool + if oldUnits, found := plugin["supervisor_unit"]; found { + applied = true + + // Check if the new option already exists and merge the two + var units []string + if newUnits, found := plugin["supervisor_units"]; found { + nu, ok := newUnits.([]interface{}) + if !ok { + return nil, "", fmt.Errorf("setting 'supervisor_units' has wrong type %T", newUnits) + } + for _, raw := range nu { + u, ok := raw.(string) + if !ok { + return nil, "", fmt.Errorf("setting 'supervisor_units' contains wrong type %T", raw) + } + units = append(units, u) + } + } + ou, ok := oldUnits.([]interface{}) + if !ok { + return nil, "", fmt.Errorf("setting 'supervisor_unit' has wrong type %T", oldUnits) + } + for _, raw := range ou { + u, ok := raw.(string) + if !ok { + return nil, "", fmt.Errorf("setting 'supervisor_unit' contains wrong type %T", raw) + } + if !choice.Contains(u, units) { + units = append(units, u) + } + } + plugin["supervisor_units"] = units + + // Remove deprecated setting + delete(plugin, "supervisor_unit") + } + + // No options migrated so we can exit early + if !applied { + return nil, "", migrations.ErrNotApplicable + } + + // Create the corresponding plugin configurations + cfg := migrations.CreateTOMLStruct("inputs", "procstat") + cfg.Add("inputs", "procstat", plugin) + + output, err := toml.Marshal(cfg) + return output, "", err +} + +// Register the migration function for the plugin type +func init() { + migrations.AddPluginOptionMigration("inputs.procstat", migrate) +} diff --git a/migrations/inputs_procstat/migration_test.go b/migrations/inputs_procstat/migration_test.go new file mode 100644 index 000000000..0dc5cca88 --- /dev/null +++ b/migrations/inputs_procstat/migration_test.go @@ -0,0 +1,160 @@ +package inputs_procstat_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + _ "github.com/influxdata/telegraf/migrations/inputs_procstat" // register migration + _ "github.com/influxdata/telegraf/plugins/inputs/procstat" // register plugin +) + +func TestNoMigration(t *testing.T) { + defaultCfg := []byte(` + # Read metrics from MQTT topic(s) + [[inputs.mqtt_consumer]] + ## Broker URLs for the MQTT server or cluster. To connect to multiple + ## clusters or standalone servers, use a separate plugin instance. + ## example: servers = ["tcp://localhost:1883"] + ## servers = ["ssl://localhost:1883"] + ## servers = ["ws://localhost:1883"] + servers = ["tcp://127.0.0.1:1883"] + + ## Topics that will be subscribed to. + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + + ## The message topic will be stored in a tag specified by this value. If set + ## to the empty string no topic tag will be created. + # topic_tag = "topic" + + ## QoS policy for messages + ## 0 = at most once + ## 1 = at least once + ## 2 = exactly once + ## + ## When using a QoS of 1 or 2, you should enable persistent_session to allow + ## resuming unacknowledged messages. + # qos = 0 + + ## Connection timeout for initial connection in seconds + # connection_timeout = "30s" + + ## Max undelivered messages + ## This plugin uses tracking metrics, which ensure messages are read to + ## outputs before acknowledging them to the original broker to ensure data + ## is not lost. This option sets the maximum messages to read from the + ## broker that have not been written by an output. + ## + ## This value needs to be picked with awareness of the agent's + ## metric_batch_size value as well. Setting max undelivered messages too high + ## can result in a constant stream of data batches to the output. While + ## setting it too low may never flush the broker's messages. + # max_undelivered_messages = 1000 + + ## Persistent session disables clearing of the client session on connection. + ## In order for this option to work you must also set client_id to identify + ## the client. To receive messages that arrived while the client is offline, + ## also set the qos option to 1 or 2 and don't forget to also set the QoS when + ## publishing. + # persistent_session = false + + ## If unset, a random client ID will be generated. + # client_id = "" + + ## Username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Client trace messages + ## When set to true, and debug mode enabled in the agent settings, the MQTT + ## client's messages are included in telegraf logs. These messages are very + ## noisey, but essential for debugging issues. + # client_trace = false + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Enable extracting tag values from MQTT topics + ## _ denotes an ignored entry in the topic path + # [[inputs.mqtt_consumer.topic_parsing]] + # topic = "" + # measurement = "" + # tags = "" + # fields = "" + ## Value supported is int, float, unit + # [[inputs.mqtt_consumer.topic.types]] + # key = type +`) + + // Migrate and check that nothing changed + output, n, err := config.ApplyMigrations(defaultCfg) + require.NoError(t, err) + require.NotEmpty(t, output) + require.Zero(t, n) + require.Equal(t, string(defaultCfg), string(output)) +} + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + inputFile := filepath.Join(testcasePath, "telegraf.conf") + expectedFile := filepath.Join(testcasePath, "expected.conf") + + // Read the expected output + expected := config.NewConfig() + require.NoError(t, expected.LoadConfig(expectedFile)) + require.NotEmpty(t, expected.Inputs) + + // Read the input data + input, remote, err := config.LoadConfigFile(inputFile) + require.NoError(t, err) + require.False(t, remote) + require.NotEmpty(t, input) + + // Migrate + output, n, err := config.ApplyMigrations(input) + require.NoError(t, err) + require.NotEmpty(t, output) + require.GreaterOrEqual(t, n, uint64(1)) + actual := config.NewConfig() + require.NoError(t, actual.LoadConfigData(output)) + + // Test the output + require.Len(t, actual.Inputs, len(expected.Inputs)) + actualIDs := make([]string, 0, len(expected.Inputs)) + expectedIDs := make([]string, 0, len(expected.Inputs)) + for i := range actual.Inputs { + actualIDs = append(actualIDs, actual.Inputs[i].ID()) + expectedIDs = append(expectedIDs, expected.Inputs[i].ID()) + } + require.ElementsMatch(t, expectedIDs, actualIDs, string(output)) + }) + } +} diff --git a/migrations/inputs_procstat/testcases/deprecated_supervisor_unit merge/expected.conf b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit merge/expected.conf new file mode 100644 index 000000000..65bedc715 --- /dev/null +++ b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit merge/expected.conf @@ -0,0 +1,2 @@ +[[inputs.procstat]] +supervisor_units = ["upsd", "webserver", "mail", "proxy"] diff --git a/migrations/inputs_procstat/testcases/deprecated_supervisor_unit merge/telegraf.conf b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit merge/telegraf.conf new file mode 100644 index 000000000..20c4efbff --- /dev/null +++ b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit merge/telegraf.conf @@ -0,0 +1,3 @@ +[[inputs.procstat]] + supervisor_unit = ["webserver", "proxy"] + supervisor_units = ["upsd", "webserver", "mail"] diff --git a/migrations/inputs_procstat/testcases/deprecated_supervisor_unit/expected.conf b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit/expected.conf new file mode 100644 index 000000000..a31601c66 --- /dev/null +++ b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit/expected.conf @@ -0,0 +1,3 @@ +[[inputs.procstat]] +pid_file = "/var/run/nginx.pid" +supervisor_units = ["webserver", "proxy"] diff --git a/migrations/inputs_procstat/testcases/deprecated_supervisor_unit/telegraf.conf b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit/telegraf.conf new file mode 100644 index 000000000..9e8099965 --- /dev/null +++ b/migrations/inputs_procstat/testcases/deprecated_supervisor_unit/telegraf.conf @@ -0,0 +1,47 @@ +# Monitor process cpu and memory usage +[[inputs.procstat]] + ## PID file to monitor process + pid_file = "/var/run/nginx.pid" + ## executable name (ie, pgrep ) + # exe = "nginx" + ## pattern as argument for pgrep (ie, pgrep -f ) + # pattern = "nginx" + ## user as argument for pgrep (ie, pgrep -u ) + # user = "nginx" + ## Systemd unit name, supports globs when include_systemd_children is set to true + # systemd_unit = "nginx.service" + # include_systemd_children = false + ## CGroup name or path, supports globs + # cgroup = "systemd/system.slice/nginx.service" + ## Supervisor service names of hypervisorctl management + supervisor_unit = ["webserver", "proxy"] + + ## Windows service name + # win_service = "" + + ## override for process_name + ## This is optional; default is sourced from /proc//status + # process_name = "bar" + + ## Field name prefix + # prefix = "" + + ## When true add the full cmdline as a tag. + # cmdline_tag = false + + ## Mode to use when calculating CPU usage. Can be one of 'solaris' or 'irix'. + # mode = "irix" + + ## Add the PID as a tag instead of as a field. When collecting multiple + ## processes with otherwise matching tags this setting should be enabled to + ## ensure each process has a unique identity. + ## + ## Enabling this option may result in a large number of series, especially + ## when processes have a short lifetime. + # pid_tag = false + + ## Method to use when finding process IDs. Can be one of 'pgrep', or + ## 'native'. The pgrep finder calls the pgrep executable in the PATH while + ## the native finder performs the search directly in a manor dependent on the + ## platform. Default is 'pgrep' + # pid_finder = "pgrep" diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index a962be8da..380210c55 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -15,6 +15,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -36,7 +37,8 @@ type Procstat struct { ProcessName string `toml:"process_name"` User string `toml:"user"` SystemdUnits string `toml:"systemd_units"` - SupervisorUnit []string `toml:"supervisor_unit"` + SupervisorUnit []string `toml:"supervisor_unit" deprecated:"1.29.0;use 'supervisor_units' instead"` + SupervisorUnits []string `toml:"supervisor_units"` IncludeSystemdChildren bool `toml:"include_systemd_children"` CGroup string `toml:"cgroup"` PidTag bool `toml:"pid_tag"` @@ -64,9 +66,16 @@ func (p *Procstat) Init() error { // Check solaris mode p.solarisMode = strings.ToLower(p.Mode) == "solaris" + // Keep the old settings for compatibility + for _, u := range p.SupervisorUnit { + if !choice.Contains(u, p.SupervisorUnits) { + p.SupervisorUnits = append(p.SupervisorUnits, u) + } + } + // Check filtering switch { - case len(p.SupervisorUnit) > 0, p.SystemdUnits != "", p.WinService != "", + case len(p.SupervisorUnits) > 0, p.SystemdUnits != "", p.WinService != "", p.CGroup != "", p.PidFile != "", p.Exe != "", p.Pattern != "", p.User != "": // Do nothing as those are valid settings @@ -86,7 +95,7 @@ func (p *Procstat) Init() error { case "native": // gopsutil relies on pgrep when looking up children on darwin // see https://github.com/shirou/gopsutil/blob/v3.23.10/process/process_darwin.go#L235 - requiresChildren := len(p.SupervisorUnit) > 0 && p.Pattern != "" + requiresChildren := len(p.SupervisorUnits) > 0 && p.Pattern != "" if requiresChildren && runtime.GOOS == "darwin" { return errors.New("configuration requires the 'pgrep' finder on you OS") } @@ -124,7 +133,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error { var count int running := make(map[PID]bool) for _, r := range results { - if len(r.PIDs) < 1 && len(p.SupervisorUnit) > 0 { + if len(r.PIDs) < 1 && len(p.SupervisorUnits) > 0 { continue } count += len(r.PIDs) @@ -183,8 +192,8 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error { "pid_finder": p.PidFinder, "result": "success", } - if len(p.SupervisorUnit) > 0 { - tags["supervisor_unit"] = strings.Join(p.SupervisorUnit, ";") + if len(p.SupervisorUnits) > 0 { + tags["supervisor_unit"] = strings.Join(p.SupervisorUnits, ";") } acc.AddFields("procstat_lookup", fields, tags, now) @@ -194,7 +203,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error { // Get matching PIDs and their initial tags func (p *Procstat) findPids() ([]PidsTags, error) { switch { - case len(p.SupervisorUnit) > 0: + case len(p.SupervisorUnits) > 0: return p.findSupervisorUnits() case p.SystemdUnits != "": return p.systemdUnitPIDs() @@ -286,7 +295,7 @@ func (p *Procstat) findSupervisorUnits() ([]PidsTags, error) { } func (p *Procstat) supervisorPIDs() ([]string, map[string]map[string]string, error) { - out, err := execCommand("supervisorctl", "status", strings.Join(p.SupervisorUnit, " ")).Output() + out, err := execCommand("supervisorctl", "status", strings.Join(p.SupervisorUnits, " ")).Output() if err != nil { if !strings.Contains(err.Error(), "exit status 3") { return nil, nil, err @@ -326,7 +335,7 @@ func (p *Procstat) supervisorPIDs() ([]string, map[string]map[string]string, err mainPids[name] = statusMap } - return p.SupervisorUnit, mainPids, nil + return p.SupervisorUnits, mainPids, nil } func (p *Procstat) systemdUnitPIDs() ([]PidsTags, error) { diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index e7e24c386..e0181d62b 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -202,10 +202,10 @@ func TestInitRequiresChildDarwin(t *testing.T) { } p := Procstat{ - Pattern: "somepattern", - SupervisorUnit: []string{"a_unit"}, - PidFinder: "native", - Log: testutil.Logger{}, + Pattern: "somepattern", + SupervisorUnits: []string{"a_unit"}, + PidFinder: "native", + Log: testutil.Logger{}, } require.ErrorContains(t, p.Init(), "requires the 'pgrep' finder") } @@ -513,10 +513,10 @@ func TestGather_SameTimestamps(t *testing.T) { func TestGather_supervisorUnitPIDs(t *testing.T) { p := Procstat{ - SupervisorUnit: []string{"TestGather_supervisorUnitPIDs"}, - PidFinder: "test", - Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + SupervisorUnits: []string{"TestGather_supervisorUnitPIDs"}, + PidFinder: "test", + Log: testutil.Logger{}, + finder: newTestFinder([]PID{pid}), } require.NoError(t, p.Init()) @@ -530,10 +530,10 @@ func TestGather_supervisorUnitPIDs(t *testing.T) { func TestGather_MoresupervisorUnitPIDs(t *testing.T) { p := Procstat{ - SupervisorUnit: []string{"TestGather_STARTINGsupervisorUnitPIDs", "TestGather_FATALsupervisorUnitPIDs"}, - PidFinder: "test", - Log: testutil.Logger{}, - finder: newTestFinder([]PID{pid}), + SupervisorUnits: []string{"TestGather_STARTINGsupervisorUnitPIDs", "TestGather_FATALsupervisorUnitPIDs"}, + PidFinder: "test", + Log: testutil.Logger{}, + finder: newTestFinder([]PID{pid}), } require.NoError(t, p.Init())