feat(inputs.ethtool): Gather statistics from namespaces (#11895)

This commit is contained in:
zeffron 2022-11-15 07:54:58 -08:00 committed by GitHub
parent 5d87f1aeea
commit c7a1d9e28e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 723 additions and 62 deletions

2
go.mod
View File

@ -378,7 +378,7 @@ require (
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.5.0 // indirect
github.com/vishvananda/netlink v1.2.1-beta.2 // indirect
github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207 // indirect
github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect

View File

@ -29,6 +29,23 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## - skip: ignore interfaces that are marked down
# down_interfaces = "expose"
## Reading statistics from interfaces in additional namespaces is also
## supported, so long as the namespaces are named (have a symlink in
## /var/run/netns). The telegraf process will also need the CAP_SYS_ADMIN
## permission.
## By default, only the current namespace will be used. For additional
## namespace support, at least one of `namespace_include` and
## `namespace_exclude` must be provided.
## To include all namespaces, set `namespace_include` to `["*"]`.
## The initial namespace (if anonymous) can be specified with the empty
## string ("").
## List of namespaces to pull metrics for
# namespace_include = []
## List of namespace to ignore when pulling metrics.
# namespace_exclude = []
## Some drivers declare statistics with extra whitespace, different spacing,
## and mix cases. This list, when enabled, can be used to clean the keys.
## Here are the current possible normalizations:
@ -46,6 +63,36 @@ Interfaces can be included or ignored using:
Note that loopback interfaces will be automatically ignored.
## Namespaces
Metrics from interfaces in additional namespaces will be retrieved if either
`namespace_include` or `namespace_exclude` is configured (to a non-empty list).
This requires `CAP_SYS_ADMIN` permissions to switch namespaces, which can be
granted to telegraf in several ways. The two recommended ways are listed below:
### Using systemd capabilities
If you are using systemd to run Telegraf, you may run
`systemctl edit telegraf.service` and add the following:
```text
[Service]
AmbientCapabilities=CAP_SYS_ADMIN
```
### Configuring executable capabilities
If you are not using systemd to run Telegraf, you can configure the Telegraf
executable to have `CAP_SYS_ADMIN` when run.
```sh
sudo setcap CAP_SYS_ADMIN+epi $(which telegraf)
```
N.B.: This capability is a filesystem attribute on the binary itself. The
attribute needs to be re-applied if the Telegraf binary is rotated (e.g. on
installation of new a Telegraf version from the system package manager).
## Metrics
Metrics are dependent on the network device and driver.

View File

@ -2,7 +2,6 @@ package ethtool
import (
_ "embed"
"net"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
@ -15,9 +14,9 @@ var downInterfacesBehaviors = []string{"expose", "skip"}
type Command interface {
Init() error
DriverName(intf string) (string, error)
Interfaces() ([]net.Interface, error)
Stats(intf string) (map[string]uint64, error)
DriverName(intf NamespacedInterface) (string, error)
Interfaces(includeNamespaces bool) ([]NamespacedInterface, error)
Stats(intf NamespacedInterface) (map[string]uint64, error)
}
type Ethtool struct {
@ -30,12 +29,20 @@ type Ethtool struct {
// Behavior regarding metrics for downed interfaces
DownInterfaces string `toml:" down_interfaces"`
// This is the list of namespace names to include
NamespaceInclude []string `toml:"namespace_include"`
// This is the list of namespace names to ignore
NamespaceExclude []string `toml:"namespace_exclude"`
// Normalization on the key names
NormalizeKeys []string `toml:"normalize_keys"`
Log telegraf.Logger `toml:"-"`
interfaceFilter filter.Filter
namespaceFilter filter.Filter
includeNamespaces bool
// the ethtool command
command Command
@ -48,6 +55,7 @@ func (*Ethtool) SampleConfig() string {
const (
pluginName = "ethtool"
tagInterface = "interface"
tagNamespace = "namespace"
tagDriverName = "driver"
fieldInterfaceUp = "interface_up"
)

View File

@ -6,12 +6,14 @@ package ethtool
import (
"fmt"
"net"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"github.com/pkg/errors"
ethtoolLib "github.com/safchain/ethtool"
"github.com/vishvananda/netns"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
@ -20,7 +22,8 @@ import (
)
type CommandEthtool struct {
ethtool *ethtoolLib.Ethtool
Log telegraf.Logger
namespaceGoroutines map[string]*NamespaceGoroutine
}
func (e *Ethtool) Init() error {
@ -38,12 +41,29 @@ func (e *Ethtool) Init() error {
return fmt.Errorf("down_interfaces: %w", err)
}
// If no namespace include or exclude filters were provided, then default
// to just the initial namespace.
e.includeNamespaces = len(e.NamespaceInclude) > 0 || len(e.NamespaceExclude) > 0
if len(e.NamespaceInclude) == 0 && len(e.NamespaceExclude) == 0 {
e.NamespaceInclude = []string{""}
} else if len(e.NamespaceInclude) == 0 {
e.NamespaceInclude = []string{"*"}
}
e.namespaceFilter, err = filter.NewIncludeExcludeFilter(e.NamespaceInclude, e.NamespaceExclude)
if err != nil {
return err
}
if command, ok := e.command.(*CommandEthtool); ok {
command.Log = e.Log
}
return e.command.Init()
}
func (e *Ethtool) Gather(acc telegraf.Accumulator) error {
// Get the list of interfaces
interfaces, err := e.command.Interfaces()
interfaces, err := e.command.Interfaces(e.includeNamespaces)
if err != nil {
acc.AddError(err)
return nil
@ -53,10 +73,11 @@ func (e *Ethtool) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, iface := range interfaces {
// Check this isn't a loop back and that its matched by the filter(s)
if e.interfaceEligibleForGather(iface) {
wg.Add(1)
go func(i net.Interface) {
go func(i NamespacedInterface) {
e.gatherEthtoolStats(i, acc)
wg.Done()
}(iface)
@ -68,12 +89,17 @@ func (e *Ethtool) Gather(acc telegraf.Accumulator) error {
return nil
}
func (e *Ethtool) interfaceEligibleForGather(iface net.Interface) bool {
func (e *Ethtool) interfaceEligibleForGather(iface NamespacedInterface) bool {
// Don't gather if it is a loop back, or it isn't matched by the filter
if isLoopback(iface) || !e.interfaceFilter.Match(iface.Name) {
return false
}
// Don't gather if it's not in a namespace matched by the filter
if !e.namespaceFilter.Match(iface.Namespace.Name()) {
return false
}
// For downed interfaces, gather only for "expose"
if !interfaceUp(iface) {
return e.DownInterfaces == "expose"
@ -83,11 +109,12 @@ func (e *Ethtool) interfaceEligibleForGather(iface net.Interface) bool {
}
// Gather the stats for the interface.
func (e *Ethtool) gatherEthtoolStats(iface net.Interface, acc telegraf.Accumulator) {
func (e *Ethtool) gatherEthtoolStats(iface NamespacedInterface, acc telegraf.Accumulator) {
tags := make(map[string]string)
tags[tagInterface] = iface.Name
tags[tagNamespace] = iface.Namespace.Name()
driverName, err := e.command.DriverName(iface.Name)
driverName, err := e.command.DriverName(iface)
if err != nil {
driverErr := errors.Wrapf(err, "%s driver", iface.Name)
acc.AddError(driverErr)
@ -97,7 +124,7 @@ func (e *Ethtool) gatherEthtoolStats(iface net.Interface, acc telegraf.Accumulat
tags[tagDriverName] = driverName
fields := make(map[string]interface{})
stats, err := e.command.Stats(iface.Name)
stats, err := e.command.Stats(iface)
if err != nil {
statsErr := errors.Wrapf(err, "%s stats", iface.Name)
acc.AddError(statsErr)
@ -157,11 +184,11 @@ func inStringSlice(slice []string, value string) bool {
return false
}
func isLoopback(iface net.Interface) bool {
func isLoopback(iface NamespacedInterface) bool {
return (iface.Flags & net.FlagLoopback) != 0
}
func interfaceUp(iface net.Interface) bool {
func interfaceUp(iface NamespacedInterface) bool {
return (iface.Flags & net.FlagUp) != 0
}
@ -170,34 +197,107 @@ func NewCommandEthtool() *CommandEthtool {
}
func (c *CommandEthtool) Init() error {
if c.ethtool != nil {
// Create the goroutine for the initial namespace
initialNamespace, err := netns.Get()
if err != nil {
return err
}
namespaceGoroutine := &NamespaceGoroutine{
name: "",
handle: initialNamespace,
Log: c.Log,
}
if err := namespaceGoroutine.Start(); err != nil {
c.Log.Errorf(`Failed to start goroutine for the initial namespace: %s`, err)
return err
}
c.namespaceGoroutines = map[string]*NamespaceGoroutine{
"": namespaceGoroutine,
}
return nil
}
e, err := ethtoolLib.NewEthtool()
if err == nil {
c.ethtool = e
func (c *CommandEthtool) DriverName(intf NamespacedInterface) (driver string, err error) {
return intf.Namespace.DriverName(intf)
}
return err
func (c *CommandEthtool) Stats(intf NamespacedInterface) (stats map[string]uint64, err error) {
return intf.Namespace.Stats(intf)
}
func (c *CommandEthtool) DriverName(intf string) (string, error) {
return c.ethtool.DriverName(intf)
}
func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterface, error) {
const namespaceDirectory = "/var/run/netns"
func (c *CommandEthtool) Stats(intf string) (map[string]uint64, error) {
return c.ethtool.Stats(intf)
}
func (c *CommandEthtool) Interfaces() ([]net.Interface, error) {
// Get the list of interfaces
interfaces, err := net.Interfaces()
initialNamespace, err := netns.Get()
if err != nil {
c.Log.Errorf("Could not get initial namespace: %s", err)
return nil, err
}
return interfaces, nil
// Gather the list of namespace names to from which to retrieve interfaces.
initialNamespaceIsNamed := false
var namespaceNames []string
// Handles are only used to create namespaced goroutines. We don't prefill
// with the handle for the initial namespace because we've already created
// its goroutine in Init().
handles := map[string]netns.NsHandle{}
if includeNamespaces {
namespaces, err := os.ReadDir(namespaceDirectory)
if err != nil {
c.Log.Warnf("Could not find namespace directory: %s", err)
}
// We'll always have at least the initial namespace, so add one to ensure
// we have capacity for it.
namespaceNames = make([]string, 0, len(namespaces)+1)
for _, namespace := range namespaces {
name := namespace.Name()
namespaceNames = append(namespaceNames, name)
handle, err := netns.GetFromPath(filepath.Join(namespaceDirectory, name))
if err != nil {
c.Log.Warnf(`Could not get handle for namespace "%s": %s`, name, err)
continue
}
handles[name] = handle
if handle.Equal(initialNamespace) {
initialNamespaceIsNamed = true
}
}
}
// We don't want to gather interfaces from the same namespace twice, and
// it's possible, though unlikely, that the initial namespace is also a
// named interface.
if !initialNamespaceIsNamed {
namespaceNames = append(namespaceNames, "")
}
allInterfaces := make([]NamespacedInterface, 0)
for _, namespace := range namespaceNames {
if _, ok := c.namespaceGoroutines[namespace]; !ok {
c.namespaceGoroutines[namespace] = &NamespaceGoroutine{
name: namespace,
handle: handles[namespace],
Log: c.Log,
}
if err := c.namespaceGoroutines[namespace].Start(); err != nil {
c.Log.Errorf(`Failed to start goroutine for namespace "%s": %s`, namespace, err)
delete(c.namespaceGoroutines, namespace)
continue
}
}
interfaces, err := c.namespaceGoroutines[namespace].Interfaces()
if err != nil {
c.Log.Warnf(`Could not get interfaces from namespace "%s": %s`, namespace, err)
continue
}
allInterfaces = append(allInterfaces, interfaces...)
}
return allInterfaces, nil
}
func init() {
@ -205,6 +305,8 @@ func init() {
return &Ethtool{
InterfaceInclude: []string{},
InterfaceExclude: []string{},
NamespaceInclude: []string{},
NamespaceExclude: []string{},
command: NewCommandEthtool(),
}
})

View File

@ -12,17 +12,40 @@ import (
"github.com/influxdata/telegraf/testutil"
)
var command *Ethtool
var interfaceMap map[string]*InterfaceMock
var (
command *Ethtool
interfaceMap map[string]*InterfaceMock
)
type InterfaceMock struct {
Name string
DriverName string
NamespaceName string
Stat map[string]uint64
LoopBack bool
InterfaceUp bool
}
type NamespaceMock struct {
name string
}
func (n *NamespaceMock) Name() string {
return n.name
}
func (n *NamespaceMock) Interfaces() ([]NamespacedInterface, error) {
return nil, errors.New("it is a test bug to invoke this function")
}
func (n *NamespaceMock) DriverName(_ NamespacedInterface) (string, error) {
return "", errors.New("it is a test bug to invoke this function")
}
func (n *NamespaceMock) Stats(_ NamespacedInterface) (map[string]uint64, error) {
return nil, errors.New("it is a test bug to invoke this function")
}
type CommandEthtoolMock struct {
InterfaceMap map[string]*InterfaceMock
}
@ -32,17 +55,23 @@ func (c *CommandEthtoolMock) Init() error {
return nil
}
func (c *CommandEthtoolMock) DriverName(intf string) (string, error) {
i := c.InterfaceMap[intf]
func (c *CommandEthtoolMock) DriverName(intf NamespacedInterface) (string, error) {
i := c.InterfaceMap[intf.Name]
if i != nil {
return i.DriverName, nil
}
return "", errors.New("interface not found")
}
func (c *CommandEthtoolMock) Interfaces() ([]net.Interface, error) {
interfaceNames := make([]net.Interface, 0)
func (c *CommandEthtoolMock) Interfaces(includeNamespaces bool) ([]NamespacedInterface, error) {
namespaces := map[string]*NamespaceMock{"": {name: ""}}
interfaces := make([]NamespacedInterface, 0)
for k, v := range c.InterfaceMap {
if v.NamespaceName != "" && !includeNamespaces {
continue
}
var flag net.Flags
// When interface is up
if v.InterfaceUp {
@ -61,13 +90,27 @@ func (c *CommandEthtoolMock) Interfaces() ([]net.Interface, error) {
HardwareAddr: nil,
Flags: flag,
}
interfaceNames = append(interfaceNames, iface)
// Ensure there is a namespace if necessary
if _, ok := namespaces[v.NamespaceName]; !ok {
namespaces[v.NamespaceName] = &NamespaceMock{
name: v.NamespaceName,
}
return interfaceNames, nil
}
func (c *CommandEthtoolMock) Stats(intf string) (map[string]uint64, error) {
i := c.InterfaceMap[intf]
interfaces = append(
interfaces,
NamespacedInterface{
Interface: iface,
Namespace: namespaces[v.NamespaceName],
},
)
}
return interfaces, nil
}
func (c *CommandEthtoolMock) Stats(intf NamespacedInterface) (map[string]uint64, error) {
i := c.InterfaceMap[intf.Name]
if i != nil {
return i.Stat, nil
}
@ -176,7 +219,7 @@ func setup() {
"tx_tso_fallbacks": 0,
"tx_tso_long_headers": 0,
}
eth1 := &InterfaceMock{"eth1", "driver1", eth1Stat, false, true}
eth1 := &InterfaceMock{"eth1", "driver1", "", eth1Stat, false, true}
interfaceMap[eth1.Name] = eth1
eth2Stat := map[string]uint64{
@ -278,14 +321,218 @@ func setup() {
"tx_tso_fallbacks": 0,
"tx_tso_long_headers": 0,
}
eth2 := &InterfaceMock{"eth2", "driver1", eth2Stat, false, false}
eth2 := &InterfaceMock{"eth2", "driver1", "", eth2Stat, false, false}
interfaceMap[eth2.Name] = eth2
eth3Stat := map[string]uint64{
"interface_up": 1,
"port_rx_1024_to_15xx": 25167245,
"port_rx_128_to_255": 1573526387,
"port_rx_15xx_to_jumbo": 137819058,
"port_rx_256_to_511": 772038107,
"port_rx_512_to_1023": 78294457,
"port_rx_64": 8798065,
"port_rx_65_to_127": 450348015,
"port_rx_bad": 0,
"port_rx_bad_bytes": 0,
"port_rx_bad_gtjumbo": 0,
"port_rx_broadcast": 6428250,
"port_rx_bytes": 893460472634,
"port_rx_control": 0,
"port_rx_dp_di_dropped_packets": 2772680304,
"port_rx_dp_hlb_fetch": 0,
"port_rx_dp_hlb_wait": 0,
"port_rx_dp_q_disabled_packets": 0,
"port_rx_dp_streaming_packets": 0,
"port_rx_good": 3045991334,
"port_rx_good_bytes": 893460472927,
"port_rx_gtjumbo": 0,
"port_rx_lt64": 0,
"port_rx_multicast": 1639566045,
"port_rx_nodesc_drops": 0,
"port_rx_overflow": 0,
"port_rx_packets": 3045991334,
"port_rx_pause": 0,
"port_rx_pm_discard_bb_overflow": 0,
"port_rx_pm_discard_mapping": 0,
"port_rx_pm_discard_qbb": 0,
"port_rx_pm_discard_vfifo_full": 0,
"port_rx_pm_trunc_bb_overflow": 0,
"port_rx_pm_trunc_qbb": 0,
"port_rx_pm_trunc_vfifo_full": 0,
"port_rx_unicast": 1399997040,
"port_tx_1024_to_15xx": 236,
"port_tx_128_to_255": 275090219,
"port_tx_15xx_to_jumbo": 926,
"port_tx_256_to_511": 48567221,
"port_tx_512_to_1023": 5142016,
"port_tx_64": 113903973,
"port_tx_65_to_127": 161935699,
"port_tx_broadcast": 8,
"port_tx_bytes": 94357131016,
"port_tx_control": 0,
"port_tx_lt64": 0,
"port_tx_multicast": 325891647,
"port_tx_packets": 604640290,
"port_tx_pause": 0,
"port_tx_unicast": 278748635,
"ptp_bad_syncs": 1,
"ptp_fast_syncs": 1,
"ptp_filter_matches": 0,
"ptp_good_syncs": 136151,
"ptp_invalid_sync_windows": 0,
"ptp_no_time_syncs": 1,
"ptp_non_filter_matches": 0,
"ptp_oversize_sync_windows": 53,
"ptp_rx_no_timestamp": 0,
"ptp_rx_timestamp_packets": 0,
"ptp_sync_timeouts": 1,
"ptp_timestamp_packets": 0,
"ptp_tx_timestamp_packets": 0,
"ptp_undersize_sync_windows": 3,
"rx-0.rx_packets": 55659234,
"rx-1.rx_packets": 87880538,
"rx-2.rx_packets": 26746234,
"rx-3.rx_packets": 103026471,
"rx-4.rx_packets": 0,
"rx_eth_crc_err": 0,
"rx_frm_trunc": 0,
"rx_inner_ip_hdr_chksum_err": 0,
"rx_inner_tcp_udp_chksum_err": 0,
"rx_ip_hdr_chksum_err": 0,
"rx_mcast_mismatch": 0,
"rx_merge_events": 0,
"rx_merge_packets": 0,
"rx_nodesc_trunc": 0,
"rx_noskb_drops": 0,
"rx_outer_ip_hdr_chksum_err": 0,
"rx_outer_tcp_udp_chksum_err": 0,
"rx_reset": 0,
"rx_tcp_udp_chksum_err": 0,
"rx_tobe_disc": 0,
"tx-0.tx_packets": 85843565,
"tx-1.tx_packets": 108642725,
"tx-2.tx_packets": 202596078,
"tx-3.tx_packets": 207561010,
"tx-4.tx_packets": 0,
"tx_cb_packets": 4,
"tx_merge_events": 11025,
"tx_pio_packets": 531928114,
"tx_pushes": 604643378,
"tx_tso_bursts": 0,
"tx_tso_fallbacks": 0,
"tx_tso_long_headers": 0,
}
eth3 := &InterfaceMock{"eth3", "driver1", "namespace1", eth3Stat, false, true}
interfaceMap[eth3.Name] = eth3
eth4Stat := map[string]uint64{
"interface_up": 1,
"port_rx_1024_to_15xx": 25167245,
"port_rx_128_to_255": 1573526387,
"port_rx_15xx_to_jumbo": 137819058,
"port_rx_256_to_511": 772038107,
"port_rx_512_to_1023": 78294457,
"port_rx_64": 8798065,
"port_rx_65_to_127": 450348015,
"port_rx_bad": 0,
"port_rx_bad_bytes": 0,
"port_rx_bad_gtjumbo": 0,
"port_rx_broadcast": 6428250,
"port_rx_bytes": 893460472634,
"port_rx_control": 0,
"port_rx_dp_di_dropped_packets": 2772680304,
"port_rx_dp_hlb_fetch": 0,
"port_rx_dp_hlb_wait": 0,
"port_rx_dp_q_disabled_packets": 0,
"port_rx_dp_streaming_packets": 0,
"port_rx_good": 3045991334,
"port_rx_good_bytes": 893460472927,
"port_rx_gtjumbo": 0,
"port_rx_lt64": 0,
"port_rx_multicast": 1639566045,
"port_rx_nodesc_drops": 0,
"port_rx_overflow": 0,
"port_rx_packets": 3045991334,
"port_rx_pause": 0,
"port_rx_pm_discard_bb_overflow": 0,
"port_rx_pm_discard_mapping": 0,
"port_rx_pm_discard_qbb": 0,
"port_rx_pm_discard_vfifo_full": 0,
"port_rx_pm_trunc_bb_overflow": 0,
"port_rx_pm_trunc_qbb": 0,
"port_rx_pm_trunc_vfifo_full": 0,
"port_rx_unicast": 1399997040,
"port_tx_1024_to_15xx": 236,
"port_tx_128_to_255": 275090219,
"port_tx_15xx_to_jumbo": 926,
"port_tx_256_to_511": 48567221,
"port_tx_512_to_1023": 5142016,
"port_tx_64": 113903973,
"port_tx_65_to_127": 161935699,
"port_tx_broadcast": 8,
"port_tx_bytes": 94357131016,
"port_tx_control": 0,
"port_tx_lt64": 0,
"port_tx_multicast": 325891647,
"port_tx_packets": 604640290,
"port_tx_pause": 0,
"port_tx_unicast": 278748635,
"ptp_bad_syncs": 1,
"ptp_fast_syncs": 1,
"ptp_filter_matches": 0,
"ptp_good_syncs": 136151,
"ptp_invalid_sync_windows": 0,
"ptp_no_time_syncs": 1,
"ptp_non_filter_matches": 0,
"ptp_oversize_sync_windows": 53,
"ptp_rx_no_timestamp": 0,
"ptp_rx_timestamp_packets": 0,
"ptp_sync_timeouts": 1,
"ptp_timestamp_packets": 0,
"ptp_tx_timestamp_packets": 0,
"ptp_undersize_sync_windows": 3,
"rx-0.rx_packets": 55659234,
"rx-1.rx_packets": 87880538,
"rx-2.rx_packets": 26746234,
"rx-3.rx_packets": 103026471,
"rx-4.rx_packets": 0,
"rx_eth_crc_err": 0,
"rx_frm_trunc": 0,
"rx_inner_ip_hdr_chksum_err": 0,
"rx_inner_tcp_udp_chksum_err": 0,
"rx_ip_hdr_chksum_err": 0,
"rx_mcast_mismatch": 0,
"rx_merge_events": 0,
"rx_merge_packets": 0,
"rx_nodesc_trunc": 0,
"rx_noskb_drops": 0,
"rx_outer_ip_hdr_chksum_err": 0,
"rx_outer_tcp_udp_chksum_err": 0,
"rx_reset": 0,
"rx_tcp_udp_chksum_err": 0,
"rx_tobe_disc": 0,
"tx-0.tx_packets": 85843565,
"tx-1.tx_packets": 108642725,
"tx-2.tx_packets": 202596078,
"tx-3.tx_packets": 207561010,
"tx-4.tx_packets": 0,
"tx_cb_packets": 4,
"tx_merge_events": 11025,
"tx_pio_packets": 531928114,
"tx_pushes": 604643378,
"tx_tso_bursts": 0,
"tx_tso_fallbacks": 0,
"tx_tso_long_headers": 0,
}
eth4 := &InterfaceMock{"eth4", "driver1", "namespace2", eth4Stat, false, true}
interfaceMap[eth4.Name] = eth4
// dummy loopback including dummy stat to ensure that the ignore feature is working
lo0Stat := map[string]uint64{
"dummy": 0,
}
lo0 := &InterfaceMock{"lo0", "", lo0Stat, true, true}
lo0 := &InterfaceMock{"lo0", "", "", lo0Stat, true, true}
interfaceMap[lo0.Name] = lo0
c := &CommandEthtoolMock{interfaceMap}
@ -298,7 +545,7 @@ func setup() {
}
func toStringMapInterface(in map[string]uint64) map[string]interface{} {
var m = map[string]interface{}{}
m := map[string]interface{}{}
for k, v := range in {
m[k] = v
}
@ -306,7 +553,7 @@ func toStringMapInterface(in map[string]uint64) map[string]interface{} {
}
func toStringMapUint(in map[string]interface{}) map[string]uint64 {
var m = map[string]uint64{}
m := map[string]uint64{}
for k, v := range in {
t := v.(uint64)
m[k] = t
@ -332,6 +579,7 @@ func TestGather(t *testing.T) {
expectedTagsEth1 := map[string]string{
"interface": "eth1",
"driver": "driver1",
"namespace": "",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1)
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat)
@ -340,6 +588,7 @@ func TestGather(t *testing.T) {
expectedTagsEth2 := map[string]string{
"driver": "driver1",
"interface": "eth2",
"namespace": "",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2)
}
@ -364,6 +613,7 @@ func TestGatherIncludeInterfaces(t *testing.T) {
expectedTagsEth1 := map[string]string{
"interface": "eth1",
"driver": "driver1",
"namespace": "",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1)
@ -374,6 +624,7 @@ func TestGatherIncludeInterfaces(t *testing.T) {
expectedTagsEth2 := map[string]string{
"interface": "eth2",
"driver": "driver1",
"namespace": "",
}
acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2)
}
@ -398,6 +649,7 @@ func TestGatherIgnoreInterfaces(t *testing.T) {
expectedTagsEth1 := map[string]string{
"interface": "eth1",
"driver": "driver1",
"namespace": "",
}
acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1)
@ -408,6 +660,7 @@ func TestGatherIgnoreInterfaces(t *testing.T) {
expectedTagsEth2 := map[string]string{
"interface": "eth2",
"driver": "driver1",
"namespace": "",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2)
}
@ -432,10 +685,94 @@ func TestSkipMetricsForInterfaceDown(t *testing.T) {
expectedTagsEth1 := map[string]string{
"interface": "eth1",
"driver": "driver1",
"namespace": "",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1)
}
func TestGatherIncludeNamespaces(t *testing.T) {
setup()
var acc testutil.Accumulator
command.NamespaceInclude = append(command.NamespaceInclude, "namespace1")
err := command.Init()
require.NoError(t, err)
err = command.Gather(&acc)
require.NoError(t, err)
require.Len(t, acc.Metrics, 1)
// Should contain eth3
expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].Stat)
expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"]
expectedFieldsEth3["interface_up"] = true
expectedTagsEth3 := map[string]string{
"interface": "eth3",
"driver": "driver1",
"namespace": "namespace1",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3)
// Should not contain eth2
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat)
expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"]
expectedFieldsEth2["interface_up"] = false
expectedTagsEth2 := map[string]string{
"interface": "eth2",
"driver": "driver1",
"namespace": "",
}
acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2)
}
func TestGatherIgnoreNamespaces(t *testing.T) {
setup()
var acc testutil.Accumulator
command.NamespaceExclude = append(command.NamespaceExclude, "namespace2")
err := command.Init()
require.NoError(t, err)
err = command.Gather(&acc)
require.NoError(t, err)
require.Len(t, acc.Metrics, 3)
// Should not contain eth4
expectedFieldsEth4 := toStringMapInterface(interfaceMap["eth4"].Stat)
expectedFieldsEth4["interface_up_counter"] = expectedFieldsEth4["interface_up"]
expectedFieldsEth4["interface_up"] = true
expectedTagsEth4 := map[string]string{
"interface": "eth4",
"driver": "driver1",
"namespace": "namespace2",
}
acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth4, expectedTagsEth4)
// Should contain eth2
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat)
expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"]
expectedFieldsEth2["interface_up"] = false
expectedTagsEth2 := map[string]string{
"interface": "eth2",
"driver": "driver1",
"namespace": "",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2)
// Should contain eth3
expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].Stat)
expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"]
expectedFieldsEth3["interface_up"] = true
expectedTagsEth3 := map[string]string{
"interface": "eth3",
"driver": "driver1",
"namespace": "namespace1",
}
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3)
}
type TestCase struct {
normalization []string
stats map[string]interface{}
@ -483,7 +820,8 @@ func TestNormalizedKeys(t *testing.T) {
"port_rx": uint64(1),
"port_tx": uint64(0),
"interface_up": true,
"interface_up_counter": uint64(0)},
"interface_up_counter": uint64(0),
},
},
{
normalization: []string{"underscore", "lower", "snakecase", "trim"},
@ -542,10 +880,11 @@ func TestNormalizedKeys(t *testing.T) {
}
for _, c := range cases {
eth0 := &InterfaceMock{"eth0", "e1000e", toStringMapUint(c.stats), false, true}
eth0 := &InterfaceMock{"eth0", "e1000e", "", toStringMapUint(c.stats), false, true}
expectedTags := map[string]string{
"interface": eth0.Name,
"driver": eth0.DriverName,
"namespace": "",
}
interfaceMap = make(map[string]*InterfaceMock)

View File

@ -0,0 +1,15 @@
package ethtool
import "net"
type Namespace interface {
Name() string
Interfaces() ([]NamespacedInterface, error)
DriverName(intf NamespacedInterface) (string, error)
Stats(intf NamespacedInterface) (map[string]uint64, error)
}
type NamespacedInterface struct {
net.Interface
Namespace Namespace
}

View File

@ -0,0 +1,133 @@
package ethtool
import (
"net"
"runtime"
ethtoolLib "github.com/safchain/ethtool"
"github.com/vishvananda/netns"
"github.com/influxdata/telegraf"
)
type NamespacedAction struct {
result chan<- NamespacedResult
f func(*NamespaceGoroutine) (interface{}, error)
}
type NamespacedResult struct {
Result interface{}
Error error
}
type NamespaceGoroutine struct {
name string
handle netns.NsHandle
ethtoolClient *ethtoolLib.Ethtool
c chan NamespacedAction
Log telegraf.Logger
}
func (n *NamespaceGoroutine) Name() string {
return n.name
}
func (n *NamespaceGoroutine) Interfaces() ([]NamespacedInterface, error) {
interfaces, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) {
interfaces, err := net.Interfaces()
if err != nil {
n.Log.Errorf(`Could not get interfaces in namespace "%s": %s`, n.name, err)
return nil, err
}
namespacedInterfaces := make([]NamespacedInterface, 0, len(interfaces))
for _, iface := range interfaces {
namespacedInterfaces = append(
namespacedInterfaces,
NamespacedInterface{
Interface: iface,
Namespace: n,
},
)
}
return namespacedInterfaces, nil
})
return interfaces.([]NamespacedInterface), err
}
func (n *NamespaceGoroutine) DriverName(intf NamespacedInterface) (string, error) {
driver, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) {
return n.ethtoolClient.DriverName(intf.Name)
})
return driver.(string), err
}
func (n *NamespaceGoroutine) Stats(intf NamespacedInterface) (map[string]uint64, error) {
driver, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) {
return n.ethtoolClient.Stats(intf.Name)
})
return driver.(map[string]uint64), err
}
// Start locks a goroutine to an OS thread and ties it to the namespace, then
// loops for actions to run in the namespace.
func (n *NamespaceGoroutine) Start() error {
n.c = make(chan NamespacedAction)
started := make(chan error)
go func() {
// We're going to hold this thread locked permanently. We're going to
// do this for every namespace. This makes it very likely that the Go
// runtime will spin up new threads to replace it. To avoid thread
// leaks, we don't unlock when we're done and instead let this thread
// die.
runtime.LockOSThread()
// If this goroutine is for the initial namespace, we are already in
// the correct namespace. Switching would require CAP_SYS_ADMIN, which
// we may not have. Don't switch if the desired namespace matches the
// current one.
initialNamespace, err := netns.Get()
if err != nil {
n.Log.Errorf("Could not get initial namespace: %s", err)
started <- err
return
}
if !initialNamespace.Equal(n.handle) {
if err := netns.Set(n.handle); err != nil {
n.Log.Errorf(`Could not switch to namespace "%s": %s`, n.name, err)
started <- err
return
}
}
// Every namespace needs its own connection to ethtool
e, err := ethtoolLib.NewEthtool()
if err != nil {
n.Log.Errorf(`Could not create ethtool client for namespace "%s": %s`, n.name, err)
started <- err
return
}
n.ethtoolClient = e
started <- nil
for command := range n.c {
result, err := command.f(n)
command.result <- NamespacedResult{
Result: result,
Error: err,
}
close(command.result)
}
}()
return <-started
}
// Do runs a function inside the OS thread tied to the namespace.
func (n *NamespaceGoroutine) Do(f func(*NamespaceGoroutine) (interface{}, error)) (interface{}, error) {
result := make(chan NamespacedResult)
n.c <- NamespacedAction{
result: result,
f: f,
}
r := <-result
return r.Result, r.Error
}

View File

@ -12,6 +12,23 @@
## - skip: ignore interfaces that are marked down
# down_interfaces = "expose"
## Reading statistics from interfaces in additional namespaces is also
## supported, so long as the namespaces are named (have a symlink in
## /var/run/netns). The telegraf process will also need the CAP_SYS_ADMIN
## permission.
## By default, only the current namespace will be used. For additional
## namespace support, at least one of `namespace_include` and
## `namespace_exclude` must be provided.
## To include all namespaces, set `namespace_include` to `["*"]`.
## The initial namespace (if anonymous) can be specified with the empty
## string ("").
## List of namespaces to pull metrics for
# namespace_include = []
## List of namespace to ignore when pulling metrics.
# namespace_exclude = []
## Some drivers declare statistics with extra whitespace, different spacing,
## and mix cases. This list, when enabled, can be used to clean the keys.
## Here are the current possible normalizations: