[plugins/input/docker] Make perdevice affect also cpu and add class granularity through perdevice_include/total_include (#7312)

This commit is contained in:
Andrés Álvarez 2021-03-03 20:02:04 +01:00 committed by GitHub
parent cc61251cc9
commit 786dca2d5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 432 additions and 84 deletions

View File

@ -43,12 +43,29 @@ to gather stats from the [Engine API](https://docs.docker.com/engine/api/v1.24/)
## Timeout for docker list, info, and stats commands
timeout = "5s"
## Whether to report for each container per-device blkio (8:0, 8:1...) and
## network (eth0, eth1, ...) stats or not
## Whether to report for each container per-device blkio (8:0, 8:1...),
## network (eth0, eth1, ...) and cpu (cpu0, cpu1, ...) stats or not.
## Usage of this setting is discouraged since it will be deprecated in favor of 'perdevice_include'.
## Default value is 'true' for backwards compatibility, please set it to 'false' so that 'perdevice_include' setting
## is honored.
perdevice = true
## Whether to report for each container total blkio and network stats or not
## Specifies for which classes a per-device metric should be issued
## Possible values are 'cpu' (cpu0, cpu1, ...), 'blkio' (8:0, 8:1, ...) and 'network' (eth0, eth1, ...)
## Please note that this setting has no effect if 'perdevice' is set to 'true'
# perdevice_include = ["cpu"]
## Whether to report for each container total blkio and network stats or not.
## Usage of this setting is discouraged since it will be deprecated in favor of 'total_include'.
## Default value is 'false' for backwards compatibility, please set it to 'true' so that 'total_include' setting
## is honored.
total = false
## Specifies for which classes a total metric should be issued. Total is an aggregated of the 'perdevice' values.
## Possible values are 'cpu', 'blkio' and 'network'
## Total 'cpu' is reported directly by Docker daemon, and 'network' and 'blkio' totals are aggregated by this plugin.
## Please note that this setting has no effect if 'total' is set to 'false'
# total_include = ["cpu", "blkio", "network"]
## docker labels to include and exclude as tags. Globs accepted.
## Note that an empty array for both will include all labels as tags

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/internal/docker"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
@ -31,12 +32,14 @@ type Docker struct {
GatherServices bool `toml:"gather_services"`
Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
Total bool `toml:"total"`
TagEnvironment []string `toml:"tag_env"`
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
PerDeviceInclude []string `toml:"perdevice_include"`
Total bool `toml:"total"`
TotalInclude []string `toml:"total_include"`
TagEnvironment []string `toml:"tag_env"`
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
ContainerInclude []string `toml:"container_name_include"`
ContainerExclude []string `toml:"container_name_exclude"`
@ -72,12 +75,21 @@ const (
PB = 1000 * TB
defaultEndpoint = "unix:///var/run/docker.sock"
perDeviceIncludeDeprecationWarning = "'perdevice' setting is set to 'true' so 'blkio' and 'network' metrics will" +
"be collected. Please set it to 'false' and use 'perdevice_include' instead to control this behaviour as " +
"'perdevice' will be deprecated"
totalIncludeDeprecationWarning = "'total' setting is set to 'false' so 'blkio' and 'network' metrics will not be " +
"collected. Please set it to 'true' and use 'total_include' instead to control this behaviour as 'total' " +
"will be deprecated"
)
var (
sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`)
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
now = time.Now
sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`)
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
containerMetricClasses = []string{"cpu", "network", "blkio"}
now = time.Now
)
var sampleConfig = `
@ -110,13 +122,30 @@ var sampleConfig = `
## Timeout for docker list, info, and stats commands
timeout = "5s"
## Whether to report for each container per-device blkio (8:0, 8:1...) and
## network (eth0, eth1, ...) stats or not
## Whether to report for each container per-device blkio (8:0, 8:1...),
## network (eth0, eth1, ...) and cpu (cpu0, cpu1, ...) stats or not.
## Usage of this setting is discouraged since it will be deprecated in favor of 'perdevice_include'.
## Default value is 'true' for backwards compatibility, please set it to 'false' so that 'perdevice_include' setting
## is honored.
perdevice = true
## Whether to report for each container total blkio and network stats or not
## Specifies for which classes a per-device metric should be issued
## Possible values are 'cpu' (cpu0, cpu1, ...), 'blkio' (8:0, 8:1, ...) and 'network' (eth0, eth1, ...)
## Please note that this setting has no effect if 'perdevice' is set to 'true'
# perdevice_include = ["cpu"]
## Whether to report for each container total blkio and network stats or not.
## Usage of this setting is discouraged since it will be deprecated in favor of 'total_include'.
## Default value is 'false' for backwards compatibility, please set it to 'true' so that 'total_include' setting
## is honored.
total = false
## Specifies for which classes a total metric should be issued. Total is an aggregated of the 'perdevice' values.
## Possible values are 'cpu', 'blkio' and 'network'
## Total 'cpu' is reported directly by Docker daemon, and 'network' and 'blkio' totals are aggregated by this plugin.
## Please note that this setting has no effect if 'total' is set to 'false'
# total_include = ["cpu", "blkio", "network"]
## Which environment variables should we use as a tag
##tag_env = ["JAVA_HOME", "HEAP_SIZE"]
@ -141,6 +170,41 @@ func (d *Docker) Description() string {
return "Read metrics about docker containers"
}
func (d *Docker) Init() error {
err := choice.CheckSlice(d.PerDeviceInclude, containerMetricClasses)
if err != nil {
return fmt.Errorf("error validating 'perdevice_include' setting : %v", err)
}
err = choice.CheckSlice(d.TotalInclude, containerMetricClasses)
if err != nil {
return fmt.Errorf("error validating 'total_include' setting : %v", err)
}
// Temporary logic needed for backwards compatibility until 'perdevice' setting is removed.
if d.PerDevice {
d.Log.Warn(perDeviceIncludeDeprecationWarning)
if !choice.Contains("network", d.PerDeviceInclude) {
d.PerDeviceInclude = append(d.PerDeviceInclude, "network")
}
if !choice.Contains("blkio", d.PerDeviceInclude) {
d.PerDeviceInclude = append(d.PerDeviceInclude, "blkio")
}
}
// Temporary logic needed for backwards compatibility until 'total' setting is removed.
if !d.Total {
d.Log.Warn(totalIncludeDeprecationWarning)
if choice.Contains("cpu", d.TotalInclude) {
d.TotalInclude = []string{"cpu"}
} else {
d.TotalInclude = []string{}
}
}
return nil
}
// Gather metrics from the docker server.
func (d *Docker) Gather(acc telegraf.Accumulator) error {
if d.client == nil {
@ -516,7 +580,7 @@ func (d *Docker) gatherContainerInspect(
for _, envvar := range info.Config.Env {
for _, configvar := range d.TagEnvironment {
dockEnv := strings.SplitN(envvar, "=", 2)
//check for presence of tag in whitelist
// check for presence of tag in whitelist
if len(dockEnv) == 2 && len(strings.TrimSpace(dockEnv[1])) != 0 && configvar == dockEnv[0] {
tags[dockEnv[0]] = dockEnv[1]
}
@ -563,7 +627,7 @@ func (d *Docker) gatherContainerInspect(
}
}
parseContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total, daemonOSType)
parseContainerStats(v, acc, tags, container.ID, d.PerDeviceInclude, d.TotalInclude, daemonOSType)
return nil
}
@ -573,8 +637,8 @@ func parseContainerStats(
acc telegraf.Accumulator,
tags map[string]string,
id string,
perDevice bool,
total bool,
perDeviceInclude []string,
totalInclude []string,
daemonOSType string,
) {
tm := stat.Read
@ -643,48 +707,52 @@ func parseContainerStats(
acc.AddFields("docker_container_mem", memfields, tags, tm)
cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemUsage,
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
"container_id": id,
}
if daemonOSType != "windows" {
previousCPU := stat.PreCPUStats.CPUUsage.TotalUsage
previousSystem := stat.PreCPUStats.SystemUsage
cpuPercent := CalculateCPUPercentUnix(previousCPU, previousSystem, stat)
cpufields["usage_percent"] = cpuPercent
} else {
cpuPercent := calculateCPUPercentWindows(stat)
cpufields["usage_percent"] = cpuPercent
}
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
acc.AddFields("docker_container_cpu", cpufields, cputags, tm)
// If we have OnlineCPUs field, then use it to restrict stats gathering to only Online CPUs
// (https://github.com/moby/moby/commit/115f91d7575d6de6c7781a96a082f144fd17e400)
var percpuusage []uint64
if stat.CPUStats.OnlineCPUs > 0 {
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage[:stat.CPUStats.OnlineCPUs]
} else {
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage
}
for i, percpu := range percpuusage {
percputags := copyTags(tags)
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
fields := map[string]interface{}{
"usage_total": percpu,
"container_id": id,
if choice.Contains("cpu", totalInclude) {
cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemUsage,
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
"container_id": id,
}
if daemonOSType != "windows" {
previousCPU := stat.PreCPUStats.CPUUsage.TotalUsage
previousSystem := stat.PreCPUStats.SystemUsage
cpuPercent := CalculateCPUPercentUnix(previousCPU, previousSystem, stat)
cpufields["usage_percent"] = cpuPercent
} else {
cpuPercent := calculateCPUPercentWindows(stat)
cpufields["usage_percent"] = cpuPercent
}
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
acc.AddFields("docker_container_cpu", cpufields, cputags, tm)
}
if choice.Contains("cpu", perDeviceInclude) {
// If we have OnlineCPUs field, then use it to restrict stats gathering to only Online CPUs
// (https://github.com/moby/moby/commit/115f91d7575d6de6c7781a96a082f144fd17e400)
var percpuusage []uint64
if stat.CPUStats.OnlineCPUs > 0 {
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage[:stat.CPUStats.OnlineCPUs]
} else {
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage
}
for i, percpu := range percpuusage {
percputags := copyTags(tags)
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
fields := map[string]interface{}{
"usage_total": percpu,
"container_id": id,
}
acc.AddFields("docker_container_cpu", fields, percputags, tm)
}
acc.AddFields("docker_container_cpu", fields, percputags, tm)
}
totalNetworkStatMap := make(map[string]interface{})
@ -701,12 +769,12 @@ func parseContainerStats(
"container_id": id,
}
// Create a new network tag dictionary for the "network" tag
if perDevice {
if choice.Contains("network", perDeviceInclude) {
nettags := copyTags(tags)
nettags["network"] = network
acc.AddFields("docker_container_net", netfields, nettags, tm)
}
if total {
if choice.Contains("network", totalInclude) {
for field, value := range netfields {
if field == "container_id" {
continue
@ -733,27 +801,21 @@ func parseContainerStats(
}
// totalNetworkStatMap could be empty if container is running with --net=host.
if total && len(totalNetworkStatMap) != 0 {
if choice.Contains("network", totalInclude) && len(totalNetworkStatMap) != 0 {
nettags := copyTags(tags)
nettags["network"] = "total"
totalNetworkStatMap["container_id"] = id
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, tm)
}
gatherBlockIOMetrics(stat, acc, tags, tm, id, perDevice, total)
perDeviceBlkio := choice.Contains("blkio", perDeviceInclude)
totalBlkio := choice.Contains("blkio", totalInclude)
gatherBlockIOMetrics(stat, acc, tags, tm, id, perDeviceBlkio, totalBlkio)
}
func gatherBlockIOMetrics(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,
tm time.Time,
id string,
perDevice bool,
total bool,
) {
blkioStats := stat.BlkioStats
// Make a map of devices to their block io stats
// Make a map of devices to their block io stats
func getDeviceStatMap(blkioStats types.BlkioStats) map[string]map[string]interface{} {
deviceStatMap := make(map[string]map[string]interface{})
for _, metric := range blkioStats.IoServiceBytesRecursive {
@ -811,6 +873,20 @@ func gatherBlockIOMetrics(
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
deviceStatMap[device]["sectors_recursive"] = metric.Value
}
return deviceStatMap
}
func gatherBlockIOMetrics(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,
tm time.Time,
id string,
perDevice bool,
total bool,
) {
blkioStats := stat.BlkioStats
deviceStatMap := getDeviceStatMap(blkioStats)
totalStatMap := make(map[string]interface{})
for device, fields := range deviceStatMap {
@ -942,12 +1018,14 @@ func (d *Docker) getNewClient() (Client, error) {
func init() {
inputs.Add("docker", func() telegraf.Input {
return &Docker{
PerDevice: true,
Timeout: internal.Duration{Duration: time.Second * 5},
Endpoint: defaultEndpoint,
newEnvClient: NewEnvClient,
newClient: NewClient,
filtersCreated: false,
PerDevice: true,
PerDeviceInclude: []string{"cpu"},
TotalInclude: []string{"cpu", "blkio", "network"},
Timeout: internal.Duration{Duration: time.Second * 5},
Endpoint: defaultEndpoint,
newEnvClient: NewEnvClient,
newClient: NewClient,
filtersCreated: false,
}
})
}

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"io/ioutil"
"reflect"
"sort"
"strings"
"testing"
@ -12,6 +13,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
@ -110,7 +112,7 @@ func TestDockerGatherContainerStats(t *testing.T) {
"container_image": "redis/image",
}
parseContainerStats(stats, &acc, tags, "123456789", true, true, "linux")
parseContainerStats(stats, &acc, tags, "123456789", containerMetricClasses, containerMetricClasses, "linux")
// test docker_container_net measurement
netfields := map[string]interface{}{
@ -396,6 +398,8 @@ func TestContainerLabels(t *testing.T) {
newClient: newClientFunc,
LabelInclude: tt.include,
LabelExclude: tt.exclude,
Total: true,
TotalInclude: []string{"cpu"},
}
err := d.Gather(&acc)
@ -751,6 +755,9 @@ func TestDockerGatherInfo(t *testing.T) {
newClient: newClient,
TagEnvironment: []string{"ENVVAR1", "ENVVAR2", "ENVVAR3", "ENVVAR5",
"ENVVAR6", "ENVVAR7", "ENVVAR8", "ENVVAR9"},
PerDeviceInclude: []string{"cpu", "network", "blkio"},
Total: true,
TotalInclude: []string{""},
}
err := acc.GatherError(d.Gather)
@ -1117,3 +1124,243 @@ func TestHostnameFromID(t *testing.T) {
}
}
func Test_parseContainerStatsPerDeviceAndTotal(t *testing.T) {
type args struct {
stat *types.StatsJSON
acc telegraf.Accumulator
tags map[string]string
id string
perDeviceInclude []string
totalInclude []string
daemonOSType string
}
var (
testDate = time.Date(2018, 6, 14, 5, 51, 53, 266176036, time.UTC)
metricCpuTotal = testutil.MustMetric(
"docker_container_cpu",
map[string]string{
"cpu": "cpu-total",
},
map[string]interface{}{},
testDate)
metricCpu0 = testutil.MustMetric(
"docker_container_cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{},
testDate)
metricCpu1 = testutil.MustMetric(
"docker_container_cpu",
map[string]string{
"cpu": "cpu1",
},
map[string]interface{}{},
testDate)
metricNetworkTotal = testutil.MustMetric(
"docker_container_net",
map[string]string{
"network": "total",
},
map[string]interface{}{},
testDate)
metricNetworkEth0 = testutil.MustMetric(
"docker_container_net",
map[string]string{
"network": "eth0",
},
map[string]interface{}{},
testDate)
metricNetworkEth1 = testutil.MustMetric(
"docker_container_net",
map[string]string{
"network": "eth0",
},
map[string]interface{}{},
testDate)
metricBlkioTotal = testutil.MustMetric(
"docker_container_blkio",
map[string]string{
"device": "total",
},
map[string]interface{}{},
testDate)
metricBlkio6_0 = testutil.MustMetric(
"docker_container_blkio",
map[string]string{
"device": "6:0",
},
map[string]interface{}{},
testDate)
metricBlkio6_1 = testutil.MustMetric(
"docker_container_blkio",
map[string]string{
"device": "6:1",
},
map[string]interface{}{},
testDate)
)
stats := testStats()
tests := []struct {
name string
args args
expected []telegraf.Metric
}{
{
name: "Per device and total metrics enabled",
args: args{
stat: stats,
perDeviceInclude: containerMetricClasses,
totalInclude: containerMetricClasses,
},
expected: []telegraf.Metric{
metricCpuTotal, metricCpu0, metricCpu1,
metricNetworkTotal, metricNetworkEth0, metricNetworkEth1,
metricBlkioTotal, metricBlkio6_0, metricBlkio6_1,
},
},
{
name: "Per device metrics enabled",
args: args{
stat: stats,
perDeviceInclude: containerMetricClasses,
totalInclude: []string{},
},
expected: []telegraf.Metric{
metricCpu0, metricCpu1,
metricNetworkEth0, metricNetworkEth1,
metricBlkio6_0, metricBlkio6_1,
},
},
{
name: "Total metrics enabled",
args: args{
stat: stats,
perDeviceInclude: []string{},
totalInclude: containerMetricClasses,
},
expected: []telegraf.Metric{metricCpuTotal, metricNetworkTotal, metricBlkioTotal},
},
{
name: "Per device and total metrics disabled",
args: args{
stat: stats,
perDeviceInclude: []string{},
totalInclude: []string{},
},
expected: []telegraf.Metric{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator
parseContainerStats(tt.args.stat, &acc, tt.args.tags, tt.args.id, tt.args.perDeviceInclude,
tt.args.totalInclude, tt.args.daemonOSType)
actual := FilterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool {
return choice.Contains(m.Name(),
[]string{"docker_container_cpu", "docker_container_net", "docker_container_blkio"})
})
testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.OnlyTags(), testutil.SortMetrics())
})
}
}
func TestDocker_Init(t *testing.T) {
type fields struct {
PerDevice bool
PerDeviceInclude []string
Total bool
TotalInclude []string
}
tests := []struct {
name string
fields fields
wantErr bool
wantPerDeviceInclude []string
wantTotalInclude []string
}{
{
"Unsupported perdevice_include setting",
fields{
PerDevice: false,
PerDeviceInclude: []string{"nonExistentClass"},
Total: false,
TotalInclude: []string{"cpu"},
},
true,
[]string{},
[]string{},
},
{
"Unsupported total_include setting",
fields{
PerDevice: false,
PerDeviceInclude: []string{"cpu"},
Total: false,
TotalInclude: []string{"nonExistentClass"},
},
true,
[]string{},
[]string{},
},
{
"PerDevice true adds network and blkio",
fields{
PerDevice: true,
PerDeviceInclude: []string{"cpu"},
Total: true,
TotalInclude: []string{"cpu"},
},
false,
[]string{"cpu", "network", "blkio"},
[]string{"cpu"},
},
{
"Total false removes network and blkio",
fields{
PerDevice: false,
PerDeviceInclude: []string{"cpu"},
Total: false,
TotalInclude: []string{"cpu", "network", "blkio"},
},
false,
[]string{"cpu"},
[]string{"cpu"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &Docker{
Log: testutil.Logger{},
PerDevice: tt.fields.PerDevice,
PerDeviceInclude: tt.fields.PerDeviceInclude,
Total: tt.fields.Total,
TotalInclude: tt.fields.TotalInclude,
}
err := d.Init()
if (err != nil) != tt.wantErr {
t.Errorf("Init() error = %v, wantErr %v", err, tt.wantErr)
}
if err == nil {
if !reflect.DeepEqual(d.PerDeviceInclude, tt.wantPerDeviceInclude) {
t.Errorf("Perdevice include: got '%v', want '%v'", d.PerDeviceInclude, tt.wantPerDeviceInclude)
}
if !reflect.DeepEqual(d.TotalInclude, tt.wantTotalInclude) {
t.Errorf("Total include: got '%v', want '%v'", d.TotalInclude, tt.wantTotalInclude)
}
}
})
}
}

View File

@ -397,7 +397,6 @@ func (a *Accumulator) AssertDoesNotContainsTaggedFields(
}
return
}
func (a *Accumulator) AssertContainsFields(
t *testing.T,
measurement string,

View File

@ -6,6 +6,7 @@ import (
"os"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
@ -63,3 +64,9 @@ func TestMetric(value interface{}, name ...string) telegraf.Metric {
)
return pt
}
// OnlyTags returns an option for keeping only "Tags" for a given Metric
func OnlyTags() cmp.Option {
f := func(p cmp.Path) bool { return p.String() != "Tags" && p.String() != "" }
return cmp.FilterPath(f, cmp.Ignore())
}