fix(inputs.systemd_units): Revert to only gather loaded units by default (#15108)

This commit is contained in:
Sven Rebhan 2024-04-12 16:19:23 -04:00 committed by GitHub
parent 8d8ee533ce
commit 084d49dc9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 181 additions and 128 deletions

View File

@ -1,6 +1,18 @@
<!-- markdownlint-disable MD024 -->
# Changelog
## Unreleased
### Important Changes
- [PR #15108](https://github.com/influxdata/telegraf/pull/15108) reverts the
behavior of `inputs.systemd_units` back to pre-v1.30.0 to only collect units
already loaded by systemd, i.e. not collecting disabled or static units. This
was necessary because using unspecific filters will cause significant load on
the system as systemd needs to read all unit-files matching the pattern in
each gather cycle. If you use specific patterns and want to collect non-loaded
units, please set the `collect_disabled_units` option to `true`.
## v1.30.1 [2024-04-01]
### Bugfixes

View File

@ -31,6 +31,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## automount, swap, timer, path, slice and scope
# unittype = "service"
## Collect also units not loaded by systemd, i.e. disabled or static units
## Enabling this feature might introduce significant load when used with
## unspecific patterns (such as '*') as systemd will need to load all
## matching unit files.
# collect_disabled_units = false
## Collect detailed information for the units
# details = false

View File

@ -12,6 +12,12 @@
## automount, swap, timer, path, slice and scope
# unittype = "service"
## Collect also units not loaded by systemd, i.e. disabled or static units
## Enabling this feature might introduce significant load when used with
## unspecific patterns (such as '*') as systemd will need to load all
## matching unit files.
# collect_disabled_units = false
## Collect detailed information for the units
# details = false

View File

@ -15,11 +15,12 @@ var sampleConfig string
// SystemdUnits is a telegraf plugin to gather systemd unit status
type SystemdUnits struct {
Pattern string `toml:"pattern"`
UnitType string `toml:"unittype"`
Details bool `toml:"details"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
Pattern string `toml:"pattern"`
UnitType string `toml:"unittype"`
Details bool `toml:"details"`
CollectDisabled bool `toml:"collect_disabled_units"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
archParams
}

View File

@ -115,14 +115,6 @@ var subMap = map[string]int{
"elapsed": 0x00a0,
}
type unitInfo struct {
name string
state dbus.UnitStatus
properties map[string]interface{}
unitFileState string
unitFilePreset string
}
type client interface {
Connected() bool
Close()
@ -135,9 +127,10 @@ type client interface {
}
type archParams struct {
client client
pattern []string
filter filter.Filter
client client
pattern []string
filter filter.Filter
unitTypeDBus string
}
func (s *SystemdUnits) Init() error {
@ -156,7 +149,7 @@ func (s *SystemdUnits) Init() error {
default:
return fmt.Errorf("invalid 'unittype' %q", s.UnitType)
}
s.UnitType = strings.ToUpper(s.UnitType[0:1]) + strings.ToLower(s.UnitType[1:])
s.unitTypeDBus = strings.ToUpper(s.UnitType[0:1]) + strings.ToLower(s.UnitType[1:])
s.pattern = strings.Split(s.Pattern, " ")
f, err := filter.Compile(s.pattern)
@ -205,15 +198,18 @@ func (s *SystemdUnits) Gather(acc telegraf.Accumulator) error {
return fmt.Errorf("listing loaded units failed: %w", err)
}
// List all unit files matching the pattern to also get disabled units
list := []string{"enabled", "disabled", "static"}
files, err := s.client.ListUnitFilesByPatternsContext(ctx, list, s.pattern)
if err != nil {
return fmt.Errorf("listing unit files failed: %w", err)
var files []dbus.UnitFile
if s.CollectDisabled {
// List all unit files matching the pattern to also get disabled units
list := []string{"enabled", "disabled", "static"}
files, err = s.client.ListUnitFilesByPatternsContext(ctx, list, s.pattern)
if err != nil {
return fmt.Errorf("listing unit files failed: %w", err)
}
}
// Collect all matching units, the loaded ones and the disabled ones
states := make([]dbus.UnitStatus, 0, len(files))
states := make([]dbus.UnitStatus, 0, len(loaded))
// Match all loaded units first
seen := make(map[string]bool)
@ -235,124 +231,95 @@ func (s *SystemdUnits) Gather(acc telegraf.Accumulator) error {
// Now split the unit-files into disabled ones and static ones, ignore
// enabled units as those are already contained in the "loaded" list.
disabled := make([]string, 0, len(files))
static := make([]string, 0, len(files))
for _, f := range files {
name := path.Base(f.Path)
if len(files) > 0 {
disabled := make([]string, 0, len(files))
static := make([]string, 0, len(files))
for _, f := range files {
name := path.Base(f.Path)
switch f.Type {
case "disabled":
if seen[name] {
continue
}
seen[name] = true
switch f.Type {
case "disabled":
if seen[name] {
continue
}
seen[name] = true
// Detect disabled multi-instance units and declare them as static
_, suffix, found := strings.Cut(name, "@")
instance, _, _ := strings.Cut(suffix, ".")
if found && instance == "" {
// Detect disabled multi-instance units and declare them as static
_, suffix, found := strings.Cut(name, "@")
instance, _, _ := strings.Cut(suffix, ".")
if found && instance == "" {
static = append(static, name)
continue
}
disabled = append(disabled, name)
case "static":
// Make sure we filter already loaded static multi-instance units
instance := name
if strings.Contains(name, "@") {
prefix, _, _ := strings.Cut(name, "@")
suffix := path.Ext(name)
instance = prefix + "@" + suffix
}
if seen[instance] || seen[name] {
continue
}
seen[instance] = true
static = append(static, name)
continue
}
disabled = append(disabled, name)
case "static":
// Make sure we filter already loaded static multi-instance units
instance := name
if strings.Contains(name, "@") {
prefix, _, _ := strings.Cut(name, "@")
suffix := path.Ext(name)
instance = prefix + "@" + suffix
}
if seen[instance] || seen[name] {
continue
}
seen[instance] = true
static = append(static, name)
}
}
// Resolve the disabled and remaining static units
disabledStates, err := s.client.ListUnitsByNamesContext(ctx, disabled)
if err != nil {
return fmt.Errorf("listing unit states failed: %w", err)
}
states = append(states, disabledStates...)
// Add special information about unused static units
for _, name := range static {
if !strings.EqualFold(strings.TrimPrefix(path.Ext(name), "."), s.UnitType) {
continue
}
states = append(states, dbus.UnitStatus{
Name: name,
LoadState: "stub",
ActiveState: "inactive",
SubState: "dead",
})
// Resolve the disabled and remaining static units
disabledStates, err := s.client.ListUnitsByNamesContext(ctx, disabled)
if err != nil {
return fmt.Errorf("listing unit states failed: %w", err)
}
states = append(states, disabledStates...)
// Add special information about unused static units
for _, name := range static {
if !strings.EqualFold(strings.TrimPrefix(path.Ext(name), "."), s.UnitType) {
continue
}
states = append(states, dbus.UnitStatus{
Name: name,
LoadState: "stub",
ActiveState: "inactive",
SubState: "dead",
})
}
}
// Merge the unit information into one struct
units := make([]unitInfo, 0, len(states))
for _, state := range states {
// Filter units of the wrong type
props, err := s.client.GetUnitTypePropertiesContext(ctx, state.Name, s.UnitType)
if err != nil {
// Skip units returning "Unknown interface" errors as those indicate
// that the unit is of the wrong type.
if strings.Contains(err.Error(), "Unknown interface") {
continue
}
// For other units we make up properties, usually those are
// disabled multi-instance units
props = map[string]interface{}{
"StatusErrno": int64(-1),
"NRestarts": uint64(0),
}
if idx := strings.LastIndex(state.Name, "."); idx < 0 || state.Name[idx+1:] != s.UnitType {
continue
}
u := unitInfo{
name: state.Name,
state: state,
properties: props,
}
// Get required unit file properties
if v, err := s.client.GetUnitPropertyContext(ctx, state.Name, "UnitFileState"); err == nil {
u.unitFileState = strings.Trim(v.Value.String(), `'"`)
}
if v, err := s.client.GetUnitPropertyContext(ctx, state.Name, "UnitFilePreset"); err == nil {
u.unitFilePreset = strings.Trim(v.Value.String(), `'"`)
}
units = append(units, u)
}
// Create the metrics
for _, u := range units {
// Map the state names to numerical values
load, ok := loadMap[u.state.LoadState]
load, ok := loadMap[state.LoadState]
if !ok {
acc.AddError(fmt.Errorf("parsing field 'load' failed, value not in map: %s", u.state.LoadState))
acc.AddError(fmt.Errorf("parsing field 'load' failed, value not in map: %s", state.LoadState))
continue
}
active, ok := activeMap[u.state.ActiveState]
active, ok := activeMap[state.ActiveState]
if !ok {
acc.AddError(fmt.Errorf("parsing field field 'active' failed, value not in map: %s", u.state.ActiveState))
acc.AddError(fmt.Errorf("parsing field 'active' failed, value not in map: %s", state.ActiveState))
continue
}
subState, ok := subMap[u.state.SubState]
subState, ok := subMap[state.SubState]
if !ok {
acc.AddError(fmt.Errorf("parsing field field 'sub' failed, value not in map: %s", u.state.SubState))
acc.AddError(fmt.Errorf("parsing field 'sub' failed, value not in map: %s", state.SubState))
continue
}
// Create the metric
tags := map[string]string{
"name": u.name,
"load": u.state.LoadState,
"active": u.state.ActiveState,
"sub": u.state.SubState,
"name": state.Name,
"load": state.LoadState,
"active": state.ActiveState,
"sub": state.SubState,
}
fields := map[string]interface{}{
@ -362,17 +329,42 @@ func (s *SystemdUnits) Gather(acc telegraf.Accumulator) error {
}
if s.Details {
tags["state"] = u.unitFileState
tags["preset"] = u.unitFilePreset
properties, err := s.client.GetUnitTypePropertiesContext(ctx, state.Name, s.unitTypeDBus)
if err != nil {
// Skip units returning "Unknown interface" errors as those indicate
// that the unit is of the wrong type.
if strings.Contains(err.Error(), "Unknown interface") {
continue
}
// For other units we make up properties, usually those are
// disabled multi-instance units
properties = map[string]interface{}{
"StatusErrno": int64(-1),
"NRestarts": uint64(0),
}
}
fields["status_errno"] = u.properties["StatusErrno"]
fields["restarts"] = u.properties["NRestarts"]
fields["pid"] = u.properties["MainPID"]
fields["mem_current"] = u.properties["MemoryCurrent"]
fields["mem_peak"] = u.properties["MemoryPeak"]
fields["swap_current"] = u.properties["MemorySwapCurrent"]
fields["swap_peak"] = u.properties["MemorySwapPeak"]
fields["mem_avail"] = u.properties["MemoryAvailable"]
// Get required unit file properties
var unitFileState string
if v, err := s.client.GetUnitPropertyContext(ctx, state.Name, "UnitFileState"); err == nil {
unitFileState = strings.Trim(v.Value.String(), `'"`)
}
var unitFilePreset string
if v, err := s.client.GetUnitPropertyContext(ctx, state.Name, "UnitFilePreset"); err == nil {
unitFilePreset = strings.Trim(v.Value.String(), `'"`)
}
tags["state"] = unitFileState
tags["preset"] = unitFilePreset
fields["status_errno"] = properties["StatusErrno"]
fields["restarts"] = properties["NRestarts"]
fields["pid"] = properties["MainPID"]
fields["mem_current"] = properties["MemoryCurrent"]
fields["mem_peak"] = properties["MemoryPeak"]
fields["swap_current"] = properties["MemorySwapCurrent"]
fields["swap_peak"] = properties["MemorySwapPeak"]
fields["mem_avail"] = properties["MemoryAvailable"]
// Sanitize unset memory fields
for k, value := range fields {

View File

@ -715,9 +715,10 @@ func TestMultiInstance(t *testing.T) {
// Setup plugin. Do NOT call Start() as this would connect to
// the real systemd daemon.
plugin := &SystemdUnits{
Pattern: tt.pattern,
Timeout: config.Duration(time.Second),
Log: testutil.Logger{},
Pattern: tt.pattern,
CollectDisabled: true,
Timeout: config.Duration(time.Second),
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())
@ -810,6 +811,41 @@ func TestMultiInstance(t *testing.T) {
}
}
func BenchmarkAllUnitsIntegration(b *testing.B) {
plugin := &SystemdUnits{
CollectDisabled: true,
Timeout: config.Duration(3 * time.Second),
}
require.NoError(b, plugin.Init())
acc := &testutil.Accumulator{Discard: true}
require.NoError(b, plugin.Start(acc))
require.NoError(b, acc.GatherError(plugin.Gather))
require.NotZero(b, acc.NMetrics())
b.Logf("produced %d metrics", acc.NMetrics())
for n := 0; n < b.N; n++ {
_ = plugin.Gather(acc)
}
}
func BenchmarkAllLoadedUnitsIntegration(b *testing.B) {
plugin := &SystemdUnits{
Timeout: config.Duration(3 * time.Second),
}
require.NoError(b, plugin.Init())
acc := &testutil.Accumulator{Discard: true}
require.NoError(b, plugin.Start(acc))
require.NoError(b, acc.GatherError(plugin.Gather))
require.NotZero(b, acc.NMetrics())
b.Logf("produced %d metrics", acc.NMetrics())
for n := 0; n < b.N; n++ {
_ = plugin.Gather(acc)
}
}
// Fake client implementation
type fakeClient struct {
units map[string]properties