feat(inputs.cgroups): do not abort on first error, print message once (#12342)

This commit is contained in:
Michael Hoffmann 2022-12-07 15:21:59 +01:00 committed by GitHub
parent 5cb40a1882
commit d7d1f8b3f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 359 additions and 268 deletions

View File

@ -14,12 +14,20 @@ var sampleConfig string
type CGroup struct { type CGroup struct {
Paths []string `toml:"paths"` Paths []string `toml:"paths"`
Files []string `toml:"files"` Files []string `toml:"files"`
logged map[string]bool
} }
func (*CGroup) SampleConfig() string { func (*CGroup) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (cg *CGroup) Init() error {
cg.logged = make(map[string]bool)
return nil
}
func init() { func init() {
inputs.Add("cgroup", func() telegraf.Input { return &CGroup{} }) inputs.Add("cgroup", func() telegraf.Input { return &CGroup{} })
} }

View File

@ -54,7 +54,11 @@ func (g *CGroup) gatherDir(acc telegraf.Accumulator, dir string) error {
fd := fileData{data: raw, path: file.path} fd := fileData{data: raw, path: file.path}
if err := fd.parse(fields); err != nil { if err := fd.parse(fields); err != nil {
return err if !g.logged[file.path] {
acc.AddError(err)
}
g.logged[file.path] = true
continue
} }
} }

View File

@ -4,12 +4,18 @@ package cgroup
import ( import (
"testing" "testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var cg1 = &CGroup{ func TestCgroupStatistics_1(t *testing.T) {
var acc testutil.Accumulator
var cg = &CGroup{
Paths: []string{"testdata/memory"}, Paths: []string{"testdata/memory"},
Files: []string{ Files: []string{
"memory.empty", "memory.empty",
@ -19,18 +25,16 @@ var cg1 = &CGroup{
"memory.use_hierarchy", "memory.use_hierarchy",
"notify_on_release", "notify_on_release",
}, },
}
func TestCgroupStatistics_1(t *testing.T) {
var acc testutil.Accumulator
err := acc.GatherError(cg1.Gather)
require.NoError(t, err)
tags := map[string]string{
"path": "testdata/memory",
} }
fields := map[string]interface{}{
require.NoError(t, acc.GatherError(cg.Gather))
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory",
},
map[string]interface{}{
"memory.stat.cache": int64(1739362304123123123), "memory.stat.cache": int64(1739362304123123123),
"memory.stat.rss": int64(1775325184), "memory.stat.rss": int64(1775325184),
"memory.stat.rss_huge": int64(778043392), "memory.stat.rss_huge": int64(778043392),
@ -42,162 +46,203 @@ func TestCgroupStatistics_1(t *testing.T) {
"memory.limit_in_bytes": int64(223372036854771712), "memory.limit_in_bytes": int64(223372036854771712),
"memory.use_hierarchy": "12-781", "memory.use_hierarchy": "12-781",
"notify_on_release": int64(0), "notify_on_release": int64(0),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
// ======================================================================
var cg2 = &CGroup{
Paths: []string{"testdata/cpu"},
Files: []string{"cpuacct.usage_percpu"},
} }
func TestCgroupStatistics_2(t *testing.T) { func TestCgroupStatistics_2(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(cg2.Gather) var cg = &CGroup{
require.NoError(t, err) Paths: []string{"testdata/cpu"},
Files: []string{"cpuacct.usage_percpu"},
tags := map[string]string{
"path": "testdata/cpu",
} }
fields := map[string]interface{}{
require.NoError(t, acc.GatherError(cg.Gather))
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{
"path": "testdata/cpu",
},
map[string]interface{}{
"cpuacct.usage_percpu.0": int64(-1452543795404), "cpuacct.usage_percpu.0": int64(-1452543795404),
"cpuacct.usage_percpu.1": int64(1376681271659), "cpuacct.usage_percpu.1": int64(1376681271659),
"cpuacct.usage_percpu.2": int64(1450950799997), "cpuacct.usage_percpu.2": int64(1450950799997),
"cpuacct.usage_percpu.3": int64(-1473113374257), "cpuacct.usage_percpu.3": int64(-1473113374257),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
// ======================================================================
var cg3 = &CGroup{
Paths: []string{"testdata/memory/*"},
Files: []string{"memory.limit_in_bytes"},
} }
func TestCgroupStatistics_3(t *testing.T) { func TestCgroupStatistics_3(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(cg3.Gather) var cg = &CGroup{
require.NoError(t, err) Paths: []string{"testdata/memory/*"},
Files: []string{"memory.limit_in_bytes"},
tags := map[string]string{
"path": "testdata/memory/group_1",
} }
fields := map[string]interface{}{ fields := map[string]interface{}{
"memory.limit_in_bytes": int64(223372036854771712), "memory.limit_in_bytes": int64(223372036854771712),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{ require.NoError(t, acc.GatherError(cg.Gather))
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_1",
},
fields,
time.Unix(0, 0),
),
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_2", "path": "testdata/memory/group_2",
},
fields,
time.Unix(0, 0),
),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
// ======================================================================
var cg4 = &CGroup{
Paths: []string{"testdata/memory/*/*", "testdata/memory/group_2"},
Files: []string{"memory.limit_in_bytes"},
} }
func TestCgroupStatistics_4(t *testing.T) { func TestCgroupStatistics_4(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(cg4.Gather) var cg = &CGroup{
require.NoError(t, err) Paths: []string{"testdata/memory/*/*", "testdata/memory/group_2"},
Files: []string{"memory.limit_in_bytes"},
tags := map[string]string{
"path": "testdata/memory/group_1/group_1_1",
} }
fields := map[string]interface{}{ fields := map[string]interface{}{
"memory.limit_in_bytes": int64(223372036854771712), "memory.limit_in_bytes": int64(223372036854771712),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags)
tags = map[string]string{ require.NoError(t, acc.GatherError(cg.Gather))
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_1/group_1_1",
},
fields,
time.Unix(0, 0),
),
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_1/group_1_2", "path": "testdata/memory/group_1/group_1_2",
} },
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) fields,
time.Unix(0, 0),
tags = map[string]string{ ),
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_2/group_1_1",
},
fields,
time.Unix(0, 0),
),
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_2", "path": "testdata/memory/group_2",
},
map[string]interface{}{
"memory.limit_in_bytes": int64(223372036854771712),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
// ======================================================================
var cg5 = &CGroup{
Paths: []string{"testdata/memory/*/group_1_1"},
Files: []string{"memory.limit_in_bytes"},
} }
func TestCgroupStatistics_5(t *testing.T) { func TestCgroupStatistics_5(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(cg5.Gather) var cg = &CGroup{
require.NoError(t, err) Paths: []string{"testdata/memory/*/group_1_1"},
Files: []string{"memory.limit_in_bytes"},
}
tags := map[string]string{ require.NoError(t, acc.GatherError(cg.Gather))
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_1/group_1_1", "path": "testdata/memory/group_1/group_1_1",
} },
fields := map[string]interface{}{ map[string]interface{}{
"memory.limit_in_bytes": int64(223372036854771712), "memory.limit_in_bytes": int64(223372036854771712),
} },
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) time.Unix(0, 0),
),
tags = map[string]string{ metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory/group_2/group_1_1", "path": "testdata/memory/group_2/group_1_1",
},
map[string]interface{}{
"memory.limit_in_bytes": int64(223372036854771712),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
// ======================================================================
var cg6 = &CGroup{
Paths: []string{"testdata/memory"},
Files: []string{"memory.us*", "*/memory.kmem.*"},
} }
func TestCgroupStatistics_6(t *testing.T) { func TestCgroupStatistics_6(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(cg6.Gather) var cg = &CGroup{
require.NoError(t, err) Paths: []string{"testdata/memory"},
Files: []string{"memory.us*", "*/memory.kmem.*"},
tags := map[string]string{
"path": "testdata/memory",
} }
fields := map[string]interface{}{
require.NoError(t, acc.GatherError(cg.Gather))
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{
"path": "testdata/memory",
},
map[string]interface{}{
"memory.usage_in_bytes": int64(3513667584), "memory.usage_in_bytes": int64(3513667584),
"memory.use_hierarchy": "12-781", "memory.use_hierarchy": "12-781",
"memory.kmem.limit_in_bytes": int64(9223372036854771712), "memory.kmem.limit_in_bytes": int64(9223372036854771712),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
// ======================================================================
var cg7 = &CGroup{
Paths: []string{"testdata/blkio"},
Files: []string{"blkio.throttle.io_serviced"},
} }
func TestCgroupStatistics_7(t *testing.T) { func TestCgroupStatistics_7(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := acc.GatherError(cg7.Gather) var cg = &CGroup{
require.NoError(t, err) Paths: []string{"testdata/blkio"},
Files: []string{"blkio.throttle.io_serviced"},
tags := map[string]string{
"path": "testdata/blkio",
} }
fields := map[string]interface{}{
require.NoError(t, acc.GatherError(cg.Gather))
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{
"path": "testdata/blkio",
},
map[string]interface{}{
"blkio.throttle.io_serviced.11:0.Read": int64(0), "blkio.throttle.io_serviced.11:0.Read": int64(0),
"blkio.throttle.io_serviced.11:0.Write": int64(0), "blkio.throttle.io_serviced.11:0.Write": int64(0),
"blkio.throttle.io_serviced.11:0.Sync": int64(0), "blkio.throttle.io_serviced.11:0.Sync": int64(0),
@ -329,6 +374,38 @@ func TestCgroupStatistics_7(t *testing.T) {
"blkio.throttle.io_serviced.1:0.Async": int64(3), "blkio.throttle.io_serviced.1:0.Async": int64(3),
"blkio.throttle.io_serviced.1:0.Total": int64(3), "blkio.throttle.io_serviced.1:0.Total": int64(3),
"blkio.throttle.io_serviced.Total": int64(265885), "blkio.throttle.io_serviced.Total": int64(265885),
},
time.Unix(0, 0),
),
} }
acc.AssertContainsTaggedFields(t, "cgroup", fields, tags) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
func TestCgroupStatistics_8(t *testing.T) {
var acc testutil.Accumulator
var cg = &CGroup{
Paths: []string{"testdata/broken"},
Files: []string{"malformed.file", "memory.limit_in_bytes"},
logged: make(map[string]bool),
}
require.Error(t, acc.GatherError(cg.Gather))
require.Len(t, cg.logged, 1)
expected := []telegraf.Metric{
metric.New(
"cgroup",
map[string]string{"path": "testdata/broken"},
map[string]interface{}{"memory.limit_in_bytes": int64(1)},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
// clear errors so we can check for new errors in next round
acc.Errors = nil
require.NoError(t, acc.GatherError(cg.Gather))
require.Len(t, cg.logged, 1)
} }

View File

@ -0,0 +1 @@
garbage

View File

@ -0,0 +1 @@
1