Add ifname processor plugin (#7763)
This commit is contained in:
parent
a19befe376
commit
6f9c623986
|
|
@ -0,0 +1,34 @@
|
|||
package snmp
|
||||
|
||||
import (
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
)
|
||||
|
||||
type ClientConfig struct {
|
||||
// Timeout to wait for a response.
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
Retries int `toml:"retries"`
|
||||
// Values: 1, 2, 3
|
||||
Version uint8 `toml:"version"`
|
||||
|
||||
// Parameters for Version 1 & 2
|
||||
Community string `toml:"community"`
|
||||
|
||||
// Parameters for Version 2 & 3
|
||||
MaxRepetitions uint8 `toml:"max_repetitions"`
|
||||
|
||||
// Parameters for Version 3
|
||||
ContextName string `toml:"context_name"`
|
||||
// Values: "noAuthNoPriv", "authNoPriv", "authPriv"
|
||||
SecLevel string `toml:"sec_level"`
|
||||
SecName string `toml:"sec_name"`
|
||||
// Values: "MD5", "SHA", "". Default: ""
|
||||
AuthProtocol string `toml:"auth_protocol"`
|
||||
AuthPassword string `toml:"auth_password"`
|
||||
// Values: "DES", "AES", "". Default: ""
|
||||
PrivProtocol string `toml:"priv_protocol"`
|
||||
PrivPassword string `toml:"priv_password"`
|
||||
EngineID string `toml:"-"`
|
||||
EngineBoots uint32 `toml:"-"`
|
||||
EngineTime uint32 `toml:"-"`
|
||||
}
|
||||
|
|
@ -0,0 +1,180 @@
|
|||
package snmp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/soniah/gosnmp"
|
||||
)
|
||||
|
||||
// GosnmpWrapper wraps a *gosnmp.GoSNMP object so we can use it as a snmpConnection.
|
||||
type GosnmpWrapper struct {
|
||||
*gosnmp.GoSNMP
|
||||
}
|
||||
|
||||
// Host returns the value of GoSNMP.Target.
|
||||
func (gsw GosnmpWrapper) Host() string {
|
||||
return gsw.Target
|
||||
}
|
||||
|
||||
// Walk wraps GoSNMP.Walk() or GoSNMP.BulkWalk(), depending on whether the
|
||||
// connection is using SNMPv1 or newer.
|
||||
// Also, if any error is encountered, it will just once reconnect and try again.
|
||||
func (gsw GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {
|
||||
var err error
|
||||
// On error, retry once.
|
||||
// Unfortunately we can't distinguish between an error returned by gosnmp, and one returned by the walk function.
|
||||
for i := 0; i < 2; i++ {
|
||||
if gsw.Version == gosnmp.Version1 {
|
||||
err = gsw.GoSNMP.Walk(oid, fn)
|
||||
} else {
|
||||
err = gsw.GoSNMP.BulkWalk(oid, fn)
|
||||
}
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err := gsw.GoSNMP.Connect(); err != nil {
|
||||
return fmt.Errorf("reconnecting: %w", err)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Get wraps GoSNMP.GET().
|
||||
// If any error is encountered, it will just once reconnect and try again.
|
||||
func (gsw GosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
|
||||
var err error
|
||||
var pkt *gosnmp.SnmpPacket
|
||||
for i := 0; i < 2; i++ {
|
||||
pkt, err = gsw.GoSNMP.Get(oids)
|
||||
if err == nil {
|
||||
return pkt, nil
|
||||
}
|
||||
if err := gsw.GoSNMP.Connect(); err != nil {
|
||||
return nil, fmt.Errorf("reconnecting: %w", err)
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func NewWrapper(s ClientConfig) (GosnmpWrapper, error) {
|
||||
gs := GosnmpWrapper{&gosnmp.GoSNMP{}}
|
||||
|
||||
gs.Timeout = s.Timeout.Duration
|
||||
|
||||
gs.Retries = s.Retries
|
||||
|
||||
switch s.Version {
|
||||
case 3:
|
||||
gs.Version = gosnmp.Version3
|
||||
case 2, 0:
|
||||
gs.Version = gosnmp.Version2c
|
||||
case 1:
|
||||
gs.Version = gosnmp.Version1
|
||||
default:
|
||||
return GosnmpWrapper{}, fmt.Errorf("invalid version")
|
||||
}
|
||||
|
||||
if s.Version < 3 {
|
||||
if s.Community == "" {
|
||||
gs.Community = "public"
|
||||
} else {
|
||||
gs.Community = s.Community
|
||||
}
|
||||
}
|
||||
|
||||
gs.MaxRepetitions = s.MaxRepetitions
|
||||
|
||||
if s.Version == 3 {
|
||||
gs.ContextName = s.ContextName
|
||||
|
||||
sp := &gosnmp.UsmSecurityParameters{}
|
||||
gs.SecurityParameters = sp
|
||||
gs.SecurityModel = gosnmp.UserSecurityModel
|
||||
|
||||
switch strings.ToLower(s.SecLevel) {
|
||||
case "noauthnopriv", "":
|
||||
gs.MsgFlags = gosnmp.NoAuthNoPriv
|
||||
case "authnopriv":
|
||||
gs.MsgFlags = gosnmp.AuthNoPriv
|
||||
case "authpriv":
|
||||
gs.MsgFlags = gosnmp.AuthPriv
|
||||
default:
|
||||
return GosnmpWrapper{}, fmt.Errorf("invalid secLevel")
|
||||
}
|
||||
|
||||
sp.UserName = s.SecName
|
||||
|
||||
switch strings.ToLower(s.AuthProtocol) {
|
||||
case "md5":
|
||||
sp.AuthenticationProtocol = gosnmp.MD5
|
||||
case "sha":
|
||||
sp.AuthenticationProtocol = gosnmp.SHA
|
||||
case "":
|
||||
sp.AuthenticationProtocol = gosnmp.NoAuth
|
||||
default:
|
||||
return GosnmpWrapper{}, fmt.Errorf("invalid authProtocol")
|
||||
}
|
||||
|
||||
sp.AuthenticationPassphrase = s.AuthPassword
|
||||
|
||||
switch strings.ToLower(s.PrivProtocol) {
|
||||
case "des":
|
||||
sp.PrivacyProtocol = gosnmp.DES
|
||||
case "aes":
|
||||
sp.PrivacyProtocol = gosnmp.AES
|
||||
case "":
|
||||
sp.PrivacyProtocol = gosnmp.NoPriv
|
||||
default:
|
||||
return GosnmpWrapper{}, fmt.Errorf("invalid privProtocol")
|
||||
}
|
||||
|
||||
sp.PrivacyPassphrase = s.PrivPassword
|
||||
|
||||
sp.AuthoritativeEngineID = s.EngineID
|
||||
|
||||
sp.AuthoritativeEngineBoots = s.EngineBoots
|
||||
|
||||
sp.AuthoritativeEngineTime = s.EngineTime
|
||||
}
|
||||
return gs, nil
|
||||
}
|
||||
|
||||
// SetAgent takes a url (scheme://host:port) and sets the wrapped
|
||||
// GoSNMP struct's corresponding fields. This shouldn't be called
|
||||
// after using the wrapped GoSNMP struct, for example after
|
||||
// connecting.
|
||||
func (gs *GosnmpWrapper) SetAgent(agent string) error {
|
||||
if !strings.Contains(agent, "://") {
|
||||
agent = "udp://" + agent
|
||||
}
|
||||
|
||||
u, err := url.Parse(agent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch u.Scheme {
|
||||
case "tcp":
|
||||
gs.Transport = "tcp"
|
||||
case "", "udp":
|
||||
gs.Transport = "udp"
|
||||
default:
|
||||
return fmt.Errorf("unsupported scheme: %v", u.Scheme)
|
||||
}
|
||||
|
||||
gs.Target = u.Hostname()
|
||||
|
||||
portStr := u.Port()
|
||||
if portStr == "" {
|
||||
portStr = "161"
|
||||
}
|
||||
port, err := strconv.ParseUint(portStr, 10, 16)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing port: %w", err)
|
||||
}
|
||||
gs.Port = uint16(port)
|
||||
return nil
|
||||
}
|
||||
|
|
@ -7,7 +7,6 @@ import (
|
|||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"net/url"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -16,6 +15,7 @@ import (
|
|||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/snmp"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/wlog"
|
||||
"github.com/soniah/gosnmp"
|
||||
|
|
@ -82,10 +82,7 @@ func execCmd(arg0 string, args ...string) ([]byte, error) {
|
|||
out, err := execCommand(arg0, args...).Output()
|
||||
if err != nil {
|
||||
if err, ok := err.(*exec.ExitError); ok {
|
||||
return nil, NestedError{
|
||||
Err: err,
|
||||
NestedErr: fmt.Errorf("%s", bytes.TrimRight(err.Stderr, "\r\n")),
|
||||
}
|
||||
return nil, fmt.Errorf("%s: %w", bytes.TrimRight(err.Stderr, "\r\n"), err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -97,32 +94,8 @@ type Snmp struct {
|
|||
// The SNMP agent to query. Format is [SCHEME://]ADDR[:PORT] (e.g.
|
||||
// udp://1.2.3.4:161). If the scheme is not specified then "udp" is used.
|
||||
Agents []string `toml:"agents"`
|
||||
// Timeout to wait for a response.
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
Retries int `toml:"retries"`
|
||||
// Values: 1, 2, 3
|
||||
Version uint8 `toml:"version"`
|
||||
|
||||
// Parameters for Version 1 & 2
|
||||
Community string `toml:"community"`
|
||||
|
||||
// Parameters for Version 2 & 3
|
||||
MaxRepetitions uint8 `toml:"max_repetitions"`
|
||||
|
||||
// Parameters for Version 3
|
||||
ContextName string `toml:"context_name"`
|
||||
// Values: "noAuthNoPriv", "authNoPriv", "authPriv"
|
||||
SecLevel string `toml:"sec_level"`
|
||||
SecName string `toml:"sec_name"`
|
||||
// Values: "MD5", "SHA", "". Default: ""
|
||||
AuthProtocol string `toml:"auth_protocol"`
|
||||
AuthPassword string `toml:"auth_password"`
|
||||
// Values: "DES", "AES", "". Default: ""
|
||||
PrivProtocol string `toml:"priv_protocol"`
|
||||
PrivPassword string `toml:"priv_password"`
|
||||
EngineID string `toml:"-"`
|
||||
EngineBoots uint32 `toml:"-"`
|
||||
EngineTime uint32 `toml:"-"`
|
||||
snmp.ClientConfig
|
||||
|
||||
Tables []Table `toml:"table"`
|
||||
|
||||
|
|
@ -144,14 +117,14 @@ func (s *Snmp) init() error {
|
|||
s.connectionCache = make([]snmpConnection, len(s.Agents))
|
||||
|
||||
for i := range s.Tables {
|
||||
if err := s.Tables[i].init(); err != nil {
|
||||
return Errorf(err, "initializing table %s", s.Tables[i].Name)
|
||||
if err := s.Tables[i].Init(); err != nil {
|
||||
return fmt.Errorf("initializing table %s: %w", s.Tables[i].Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := range s.Fields {
|
||||
if err := s.Fields[i].init(); err != nil {
|
||||
return Errorf(err, "initializing field %s", s.Fields[i].Name)
|
||||
return fmt.Errorf("initializing field %s: %w", s.Fields[i].Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -181,8 +154,8 @@ type Table struct {
|
|||
initialized bool
|
||||
}
|
||||
|
||||
// init() builds & initializes the nested fields.
|
||||
func (t *Table) init() error {
|
||||
// Init() builds & initializes the nested fields.
|
||||
func (t *Table) Init() error {
|
||||
if t.initialized {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -194,7 +167,7 @@ func (t *Table) init() error {
|
|||
// initialize all the nested fields
|
||||
for i := range t.Fields {
|
||||
if err := t.Fields[i].init(); err != nil {
|
||||
return Errorf(err, "initializing field %s", t.Fields[i].Name)
|
||||
return fmt.Errorf("initializing field %s: %w", t.Fields[i].Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -266,7 +239,7 @@ func (f *Field) init() error {
|
|||
|
||||
_, oidNum, oidText, conversion, err := SnmpTranslate(f.Oid)
|
||||
if err != nil {
|
||||
return Errorf(err, "translating")
|
||||
return fmt.Errorf("translating: %w", err)
|
||||
}
|
||||
f.Oid = oidNum
|
||||
if f.Name == "" {
|
||||
|
|
@ -301,36 +274,30 @@ type RTableRow struct {
|
|||
Fields map[string]interface{}
|
||||
}
|
||||
|
||||
// NestedError wraps an error returned from deeper in the code.
|
||||
type NestedError struct {
|
||||
// Err is the error from where the NestedError was constructed.
|
||||
Err error
|
||||
// NestedError is the error that was passed back from the called function.
|
||||
NestedErr error
|
||||
type walkError struct {
|
||||
msg string
|
||||
err error
|
||||
}
|
||||
|
||||
// Error returns a concatenated string of all the nested errors.
|
||||
func (ne NestedError) Error() string {
|
||||
return ne.Err.Error() + ": " + ne.NestedErr.Error()
|
||||
func (e *walkError) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
// Errorf is a convenience function for constructing a NestedError.
|
||||
func Errorf(err error, msg string, format ...interface{}) error {
|
||||
return NestedError{
|
||||
NestedErr: err,
|
||||
Err: fmt.Errorf(msg, format...),
|
||||
}
|
||||
func (e *walkError) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("snmp", func() telegraf.Input {
|
||||
return &Snmp{
|
||||
Name: "snmp",
|
||||
Retries: 3,
|
||||
MaxRepetitions: 10,
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second},
|
||||
Version: 2,
|
||||
Community: "public",
|
||||
Name: "snmp",
|
||||
ClientConfig: snmp.ClientConfig{
|
||||
Retries: 3,
|
||||
MaxRepetitions: 10,
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second},
|
||||
Version: 2,
|
||||
Community: "public",
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -360,7 +327,7 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
|
|||
defer wg.Done()
|
||||
gs, err := s.getConnection(i)
|
||||
if err != nil {
|
||||
acc.AddError(Errorf(err, "agent %s", agent))
|
||||
acc.AddError(fmt.Errorf("agent %s: %w", agent, err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -371,13 +338,13 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
|
|||
}
|
||||
topTags := map[string]string{}
|
||||
if err := s.gatherTable(acc, gs, t, topTags, false); err != nil {
|
||||
acc.AddError(Errorf(err, "agent %s", agent))
|
||||
acc.AddError(fmt.Errorf("agent %s: %w", agent, err))
|
||||
}
|
||||
|
||||
// Now is the real tables.
|
||||
for _, t := range s.Tables {
|
||||
if err := s.gatherTable(acc, gs, t, topTags, true); err != nil {
|
||||
acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name))
|
||||
acc.AddError(fmt.Errorf("agent %s: gathering table %s: %w", agent, t.Name, err))
|
||||
}
|
||||
}
|
||||
}(i, agent)
|
||||
|
|
@ -447,19 +414,19 @@ func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) {
|
|||
// empty string. This results in all the non-table fields sharing the same
|
||||
// index, and being added on the same row.
|
||||
if pkt, err := gs.Get([]string{oid}); err != nil {
|
||||
return nil, Errorf(err, "performing get on field %s", f.Name)
|
||||
return nil, fmt.Errorf("performing get on field %s: %w", f.Name, err)
|
||||
} else if pkt != nil && len(pkt.Variables) > 0 && pkt.Variables[0].Type != gosnmp.NoSuchObject && pkt.Variables[0].Type != gosnmp.NoSuchInstance {
|
||||
ent := pkt.Variables[0]
|
||||
fv, err := fieldConvert(f.Conversion, ent.Value)
|
||||
if err != nil {
|
||||
return nil, Errorf(err, "converting %q (OID %s) for field %s", ent.Value, ent.Name, f.Name)
|
||||
return nil, fmt.Errorf("converting %q (OID %s) for field %s: %w", ent.Value, ent.Name, f.Name, err)
|
||||
}
|
||||
ifv[""] = fv
|
||||
}
|
||||
} else {
|
||||
err := gs.Walk(oid, func(ent gosnmp.SnmpPDU) error {
|
||||
if len(ent.Name) <= len(oid) || ent.Name[:len(oid)+1] != oid+"." {
|
||||
return NestedError{} // break the walk
|
||||
return &walkError{} // break the walk
|
||||
}
|
||||
|
||||
idx := ent.Name[len(oid):]
|
||||
|
|
@ -485,14 +452,20 @@ func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) {
|
|||
|
||||
fv, err := fieldConvert(f.Conversion, ent.Value)
|
||||
if err != nil {
|
||||
return Errorf(err, "converting %q (OID %s) for field %s", ent.Value, ent.Name, f.Name)
|
||||
return &walkError{
|
||||
msg: fmt.Sprintf("converting %q (OID %s) for field %s", ent.Value, ent.Name, f.Name),
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
ifv[idx] = fv
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if _, ok := err.(NestedError); !ok {
|
||||
return nil, Errorf(err, "performing bulk walk for field %s", f.Name)
|
||||
// Our callback always wraps errors in a walkError.
|
||||
// If this error isn't a walkError, we know it's not
|
||||
// from the callback
|
||||
if _, ok := err.(*walkError); !ok {
|
||||
return nil, fmt.Errorf("performing bulk walk for field %s: %w", f.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -546,56 +519,6 @@ type snmpConnection interface {
|
|||
Get(oids []string) (*gosnmp.SnmpPacket, error)
|
||||
}
|
||||
|
||||
// gosnmpWrapper wraps a *gosnmp.GoSNMP object so we can use it as a snmpConnection.
|
||||
type gosnmpWrapper struct {
|
||||
*gosnmp.GoSNMP
|
||||
}
|
||||
|
||||
// Host returns the value of GoSNMP.Target.
|
||||
func (gsw gosnmpWrapper) Host() string {
|
||||
return gsw.Target
|
||||
}
|
||||
|
||||
// Walk wraps GoSNMP.Walk() or GoSNMP.BulkWalk(), depending on whether the
|
||||
// connection is using SNMPv1 or newer.
|
||||
// Also, if any error is encountered, it will just once reconnect and try again.
|
||||
func (gsw gosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {
|
||||
var err error
|
||||
// On error, retry once.
|
||||
// Unfortunately we can't distinguish between an error returned by gosnmp, and one returned by the walk function.
|
||||
for i := 0; i < 2; i++ {
|
||||
if gsw.Version == gosnmp.Version1 {
|
||||
err = gsw.GoSNMP.Walk(oid, fn)
|
||||
} else {
|
||||
err = gsw.GoSNMP.BulkWalk(oid, fn)
|
||||
}
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err := gsw.GoSNMP.Connect(); err != nil {
|
||||
return Errorf(err, "reconnecting")
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Get wraps GoSNMP.GET().
|
||||
// If any error is encountered, it will just once reconnect and try again.
|
||||
func (gsw gosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
|
||||
var err error
|
||||
var pkt *gosnmp.SnmpPacket
|
||||
for i := 0; i < 2; i++ {
|
||||
pkt, err = gsw.GoSNMP.Get(oids)
|
||||
if err == nil {
|
||||
return pkt, nil
|
||||
}
|
||||
if err := gsw.GoSNMP.Connect(); err != nil {
|
||||
return nil, Errorf(err, "reconnecting")
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// getConnection creates a snmpConnection (*gosnmp.GoSNMP) object and caches the
|
||||
// result using `agentIndex` as the cache key. This is done to allow multiple
|
||||
// connections to a single address. It is an error to use a connection in
|
||||
|
|
@ -607,119 +530,21 @@ func (s *Snmp) getConnection(idx int) (snmpConnection, error) {
|
|||
|
||||
agent := s.Agents[idx]
|
||||
|
||||
gs := gosnmpWrapper{&gosnmp.GoSNMP{}}
|
||||
s.connectionCache[idx] = gs
|
||||
|
||||
if !strings.Contains(agent, "://") {
|
||||
agent = "udp://" + agent
|
||||
var err error
|
||||
var gs snmp.GosnmpWrapper
|
||||
gs, err = snmp.NewWrapper(s.ClientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u, err := url.Parse(agent)
|
||||
gs.SetAgent(agent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch u.Scheme {
|
||||
case "tcp":
|
||||
gs.Transport = "tcp"
|
||||
case "", "udp":
|
||||
gs.Transport = "udp"
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported scheme: %v", u.Scheme)
|
||||
}
|
||||
|
||||
gs.Target = u.Hostname()
|
||||
|
||||
portStr := u.Port()
|
||||
if portStr == "" {
|
||||
portStr = "161"
|
||||
}
|
||||
port, err := strconv.ParseUint(portStr, 10, 16)
|
||||
if err != nil {
|
||||
return nil, Errorf(err, "parsing port")
|
||||
}
|
||||
gs.Port = uint16(port)
|
||||
|
||||
gs.Timeout = s.Timeout.Duration
|
||||
|
||||
gs.Retries = s.Retries
|
||||
|
||||
switch s.Version {
|
||||
case 3:
|
||||
gs.Version = gosnmp.Version3
|
||||
case 2, 0:
|
||||
gs.Version = gosnmp.Version2c
|
||||
case 1:
|
||||
gs.Version = gosnmp.Version1
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid version")
|
||||
}
|
||||
|
||||
if s.Version < 3 {
|
||||
if s.Community == "" {
|
||||
gs.Community = "public"
|
||||
} else {
|
||||
gs.Community = s.Community
|
||||
}
|
||||
}
|
||||
|
||||
gs.MaxRepetitions = s.MaxRepetitions
|
||||
|
||||
if s.Version == 3 {
|
||||
gs.ContextName = s.ContextName
|
||||
|
||||
sp := &gosnmp.UsmSecurityParameters{}
|
||||
gs.SecurityParameters = sp
|
||||
gs.SecurityModel = gosnmp.UserSecurityModel
|
||||
|
||||
switch strings.ToLower(s.SecLevel) {
|
||||
case "noauthnopriv", "":
|
||||
gs.MsgFlags = gosnmp.NoAuthNoPriv
|
||||
case "authnopriv":
|
||||
gs.MsgFlags = gosnmp.AuthNoPriv
|
||||
case "authpriv":
|
||||
gs.MsgFlags = gosnmp.AuthPriv
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid secLevel")
|
||||
}
|
||||
|
||||
sp.UserName = s.SecName
|
||||
|
||||
switch strings.ToLower(s.AuthProtocol) {
|
||||
case "md5":
|
||||
sp.AuthenticationProtocol = gosnmp.MD5
|
||||
case "sha":
|
||||
sp.AuthenticationProtocol = gosnmp.SHA
|
||||
case "":
|
||||
sp.AuthenticationProtocol = gosnmp.NoAuth
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid authProtocol")
|
||||
}
|
||||
|
||||
sp.AuthenticationPassphrase = s.AuthPassword
|
||||
|
||||
switch strings.ToLower(s.PrivProtocol) {
|
||||
case "des":
|
||||
sp.PrivacyProtocol = gosnmp.DES
|
||||
case "aes":
|
||||
sp.PrivacyProtocol = gosnmp.AES
|
||||
case "":
|
||||
sp.PrivacyProtocol = gosnmp.NoPriv
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid privProtocol")
|
||||
}
|
||||
|
||||
sp.PrivacyPassphrase = s.PrivPassword
|
||||
|
||||
sp.AuthoritativeEngineID = s.EngineID
|
||||
|
||||
sp.AuthoritativeEngineBoots = s.EngineBoots
|
||||
|
||||
sp.AuthoritativeEngineTime = s.EngineTime
|
||||
}
|
||||
s.connectionCache[idx] = gs
|
||||
|
||||
if err := gs.Connect(); err != nil {
|
||||
return nil, Errorf(err, "setting up connection")
|
||||
return nil, fmt.Errorf("setting up connection: %w", err)
|
||||
}
|
||||
|
||||
return gs, nil
|
||||
|
|
@ -881,7 +706,7 @@ func snmpTable(oid string) (mibName string, oidNum string, oidText string, field
|
|||
func snmpTableCall(oid string) (mibName string, oidNum string, oidText string, fields []Field, err error) {
|
||||
mibName, oidNum, oidText, _, err = SnmpTranslate(oid)
|
||||
if err != nil {
|
||||
return "", "", "", nil, Errorf(err, "translating")
|
||||
return "", "", "", nil, fmt.Errorf("translating: %w", err)
|
||||
}
|
||||
|
||||
mibPrefix := mibName + "::"
|
||||
|
|
@ -918,7 +743,7 @@ func snmpTableCall(oid string) (mibName string, oidNum string, oidText string, f
|
|||
// this won't actually try to run a query. The `-Ch` will just cause it to dump headers.
|
||||
out, err := execCmd("snmptable", "-Ch", "-Cl", "-c", "public", "127.0.0.1", oidFullName)
|
||||
if err != nil {
|
||||
return "", "", "", nil, Errorf(err, "getting table columns")
|
||||
return "", "", "", nil, fmt.Errorf("getting table columns: %w", err)
|
||||
}
|
||||
scanner := bufio.NewScanner(bytes.NewBuffer(out))
|
||||
scanner.Scan()
|
||||
|
|
@ -1016,7 +841,7 @@ func snmpTranslateCall(oid string) (mibName string, oidNum string, oidText strin
|
|||
scanner := bufio.NewScanner(bytes.NewBuffer(out))
|
||||
ok := scanner.Scan()
|
||||
if !ok && scanner.Err() != nil {
|
||||
return "", "", "", "", Errorf(scanner.Err(), "getting OID text")
|
||||
return "", "", "", "", fmt.Errorf("getting OID text: %w", scanner.Err())
|
||||
}
|
||||
|
||||
oidText = scanner.Text()
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/snmp"
|
||||
config "github.com/influxdata/telegraf/internal/snmp"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/influxdata/toml"
|
||||
|
|
@ -88,13 +90,15 @@ func TestSampleConfig(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
expected := &Snmp{
|
||||
Agents: []string{"udp://127.0.0.1:161"},
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second},
|
||||
Version: 2,
|
||||
Community: "public",
|
||||
MaxRepetitions: 10,
|
||||
Retries: 3,
|
||||
Name: "snmp",
|
||||
Agents: []string{"udp://127.0.0.1:161"},
|
||||
ClientConfig: config.ClientConfig{
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second},
|
||||
Version: 2,
|
||||
Community: "public",
|
||||
MaxRepetitions: 10,
|
||||
Retries: 3,
|
||||
},
|
||||
Name: "snmp",
|
||||
}
|
||||
require.Equal(t, expected, conf)
|
||||
}
|
||||
|
|
@ -141,7 +145,7 @@ func TestTableInit(t *testing.T) {
|
|||
{Oid: "TEST::description", Name: "description", IsTag: true},
|
||||
},
|
||||
}
|
||||
err := tbl.init()
|
||||
err := tbl.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, "testTable", tbl.Name)
|
||||
|
|
@ -232,18 +236,20 @@ func TestSnmpInit_noTranslate(t *testing.T) {
|
|||
|
||||
func TestGetSNMPConnection_v2(t *testing.T) {
|
||||
s := &Snmp{
|
||||
Agents: []string{"1.2.3.4:567", "1.2.3.4", "udp://127.0.0.1"},
|
||||
Timeout: internal.Duration{Duration: 3 * time.Second},
|
||||
Retries: 4,
|
||||
Version: 2,
|
||||
Community: "foo",
|
||||
Agents: []string{"1.2.3.4:567", "1.2.3.4", "udp://127.0.0.1"},
|
||||
ClientConfig: config.ClientConfig{
|
||||
Timeout: internal.Duration{Duration: 3 * time.Second},
|
||||
Retries: 4,
|
||||
Version: 2,
|
||||
Community: "foo",
|
||||
},
|
||||
}
|
||||
err := s.init()
|
||||
require.NoError(t, err)
|
||||
|
||||
gsc, err := s.getConnection(0)
|
||||
require.NoError(t, err)
|
||||
gs := gsc.(gosnmpWrapper)
|
||||
gs := gsc.(snmp.GosnmpWrapper)
|
||||
assert.Equal(t, "1.2.3.4", gs.Target)
|
||||
assert.EqualValues(t, 567, gs.Port)
|
||||
assert.Equal(t, gosnmp.Version2c, gs.Version)
|
||||
|
|
@ -252,14 +258,14 @@ func TestGetSNMPConnection_v2(t *testing.T) {
|
|||
|
||||
gsc, err = s.getConnection(1)
|
||||
require.NoError(t, err)
|
||||
gs = gsc.(gosnmpWrapper)
|
||||
gs = gsc.(snmp.GosnmpWrapper)
|
||||
assert.Equal(t, "1.2.3.4", gs.Target)
|
||||
assert.EqualValues(t, 161, gs.Port)
|
||||
assert.Equal(t, "udp", gs.Transport)
|
||||
|
||||
gsc, err = s.getConnection(2)
|
||||
require.NoError(t, err)
|
||||
gs = gsc.(gosnmpWrapper)
|
||||
gs = gsc.(snmp.GosnmpWrapper)
|
||||
assert.Equal(t, "127.0.0.1", gs.Target)
|
||||
assert.EqualValues(t, 161, gs.Port)
|
||||
assert.Equal(t, "udp", gs.Transport)
|
||||
|
|
@ -280,7 +286,7 @@ func TestGetSNMPConnectionTCP(t *testing.T) {
|
|||
wg.Add(1)
|
||||
gsc, err := s.getConnection(0)
|
||||
require.NoError(t, err)
|
||||
gs := gsc.(gosnmpWrapper)
|
||||
gs := gsc.(snmp.GosnmpWrapper)
|
||||
assert.Equal(t, "127.0.0.1", gs.Target)
|
||||
assert.EqualValues(t, 56789, gs.Port)
|
||||
assert.Equal(t, "tcp", gs.Transport)
|
||||
|
|
@ -299,26 +305,28 @@ func stubTCPServer(wg *sync.WaitGroup) {
|
|||
|
||||
func TestGetSNMPConnection_v3(t *testing.T) {
|
||||
s := &Snmp{
|
||||
Agents: []string{"1.2.3.4"},
|
||||
Version: 3,
|
||||
MaxRepetitions: 20,
|
||||
ContextName: "mycontext",
|
||||
SecLevel: "authPriv",
|
||||
SecName: "myuser",
|
||||
AuthProtocol: "md5",
|
||||
AuthPassword: "password123",
|
||||
PrivProtocol: "des",
|
||||
PrivPassword: "321drowssap",
|
||||
EngineID: "myengineid",
|
||||
EngineBoots: 1,
|
||||
EngineTime: 2,
|
||||
Agents: []string{"1.2.3.4"},
|
||||
ClientConfig: config.ClientConfig{
|
||||
Version: 3,
|
||||
MaxRepetitions: 20,
|
||||
ContextName: "mycontext",
|
||||
SecLevel: "authPriv",
|
||||
SecName: "myuser",
|
||||
AuthProtocol: "md5",
|
||||
AuthPassword: "password123",
|
||||
PrivProtocol: "des",
|
||||
PrivPassword: "321drowssap",
|
||||
EngineID: "myengineid",
|
||||
EngineBoots: 1,
|
||||
EngineTime: 2,
|
||||
},
|
||||
}
|
||||
err := s.init()
|
||||
require.NoError(t, err)
|
||||
|
||||
gsc, err := s.getConnection(0)
|
||||
require.NoError(t, err)
|
||||
gs := gsc.(gosnmpWrapper)
|
||||
gs := gsc.(snmp.GosnmpWrapper)
|
||||
assert.Equal(t, gs.Version, gosnmp.Version3)
|
||||
sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters)
|
||||
assert.Equal(t, "1.2.3.4", gsc.Host())
|
||||
|
|
@ -394,7 +402,9 @@ func TestGosnmpWrapper_walk_retry(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
conn := gs.Conn
|
||||
|
||||
gsw := gosnmpWrapper{gs}
|
||||
gsw := snmp.GosnmpWrapper{
|
||||
GoSNMP: gs,
|
||||
}
|
||||
err = gsw.Walk(".1.0.0", func(_ gosnmp.SnmpPDU) error { return nil })
|
||||
srvr.Close()
|
||||
wg.Wait()
|
||||
|
|
@ -442,7 +452,9 @@ func TestGosnmpWrapper_get_retry(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
conn := gs.Conn
|
||||
|
||||
gsw := gosnmpWrapper{gs}
|
||||
gsw := snmp.GosnmpWrapper{
|
||||
GoSNMP: gs,
|
||||
}
|
||||
_, err = gsw.Get([]string{".1.0.0"})
|
||||
srvr.Close()
|
||||
wg.Wait()
|
||||
|
|
@ -788,16 +800,3 @@ func TestSnmpTableCache_hit(t *testing.T) {
|
|||
assert.Equal(t, []Field{{Name: "d"}}, fields)
|
||||
assert.Equal(t, fmt.Errorf("e"), err)
|
||||
}
|
||||
|
||||
func TestError(t *testing.T) {
|
||||
e := fmt.Errorf("nested error")
|
||||
err := Errorf(e, "top error %d", 123)
|
||||
require.Error(t, err)
|
||||
|
||||
ne, ok := err.(NestedError)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, e, ne.NestedErr)
|
||||
|
||||
assert.Contains(t, err.Error(), "top error 123")
|
||||
assert.Contains(t, err.Error(), "nested error")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/processors/enum"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/execd"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/filepath"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/ifname"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/override"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/parser"
|
||||
_ "github.com/influxdata/telegraf/plugins/processors/pivot"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,75 @@
|
|||
# Network Interface Name Processor Plugin
|
||||
|
||||
The `ifname` plugin looks up network interface names using SNMP.
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
[[processors.ifname]]
|
||||
## Name of tag holding the interface number
|
||||
# tag = "ifIndex"
|
||||
|
||||
## Name of output tag where service name will be added
|
||||
# dest = "ifName"
|
||||
|
||||
## Name of tag of the SNMP agent to request the interface name from
|
||||
# agent = "agent"
|
||||
|
||||
## Timeout for each request.
|
||||
# timeout = "5s"
|
||||
|
||||
## SNMP version; can be 1, 2, or 3.
|
||||
# version = 2
|
||||
|
||||
## SNMP community string.
|
||||
# community = "public"
|
||||
|
||||
## Number of retries to attempt.
|
||||
# retries = 3
|
||||
|
||||
## The GETBULK max-repetitions parameter.
|
||||
# max_repetitions = 10
|
||||
|
||||
## SNMPv3 authentication and encryption options.
|
||||
##
|
||||
## Security Name.
|
||||
# sec_name = "myuser"
|
||||
## Authentication protocol; one of "MD5", "SHA", or "".
|
||||
# auth_protocol = "MD5"
|
||||
## Authentication password.
|
||||
# auth_password = "pass"
|
||||
## Security Level; one of "noAuthNoPriv", "authNoPriv", or "authPriv".
|
||||
# sec_level = "authNoPriv"
|
||||
## Context Name.
|
||||
# context_name = ""
|
||||
## Privacy protocol used for encrypted messages; one of "DES", "AES" or "".
|
||||
# priv_protocol = ""
|
||||
## Privacy password used for encrypted messages.
|
||||
# priv_password = ""
|
||||
|
||||
## max_parallel_lookups is the maximum number of SNMP requests to
|
||||
## make at the same time.
|
||||
# max_parallel_lookups = 100
|
||||
|
||||
## ordered controls whether or not the metrics need to stay in the
|
||||
## same order this plugin received them in. If false, this plugin
|
||||
## may change the order when data is cached. If you need metrics to
|
||||
## stay in order set this to true. keeping the metrics ordered may
|
||||
## be slightly slower
|
||||
# ordered = false
|
||||
```
|
||||
|
||||
### Example processing:
|
||||
|
||||
Example config:
|
||||
|
||||
```toml
|
||||
[[processors.ifname]]
|
||||
tag = "ifIndex"
|
||||
dest = "ifName"
|
||||
```
|
||||
|
||||
```diff
|
||||
- foo,ifIndex=2,agent=127.0.0.1 field=123 1502489900000000000
|
||||
+ foo,ifIndex=2,agent=127.0.0.1,ifName=eth0 field=123 1502489900000000000
|
||||
```
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
package ifname
|
||||
|
||||
// See https://girai.dev/blog/lru-cache-implementation-in-go/
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
)
|
||||
|
||||
// Type aliases let the implementation be more generic
|
||||
type keyType = string
|
||||
type valType = nameMap
|
||||
|
||||
type hashType map[keyType]*list.Element
|
||||
|
||||
type LRUCache struct {
|
||||
cap uint // capacity
|
||||
l *list.List // doubly linked list
|
||||
m hashType // hash table for checking if list node exists
|
||||
}
|
||||
|
||||
// Pair is the value of a list node.
|
||||
type Pair struct {
|
||||
key keyType
|
||||
value valType
|
||||
}
|
||||
|
||||
// initializes a new LRUCache.
|
||||
func NewLRUCache(capacity uint) LRUCache {
|
||||
return LRUCache{
|
||||
cap: capacity,
|
||||
l: new(list.List),
|
||||
m: make(hashType, capacity),
|
||||
}
|
||||
}
|
||||
|
||||
// Get a list node from the hash map.
|
||||
func (c *LRUCache) Get(key keyType) (valType, bool) {
|
||||
// check if list node exists
|
||||
if node, ok := c.m[key]; ok {
|
||||
val := node.Value.(*list.Element).Value.(Pair).value
|
||||
// move node to front
|
||||
c.l.MoveToFront(node)
|
||||
return val, true
|
||||
}
|
||||
return valType{}, false
|
||||
}
|
||||
|
||||
// Put key and value in the LRUCache
|
||||
func (c *LRUCache) Put(key keyType, value valType) {
|
||||
// check if list node exists
|
||||
if node, ok := c.m[key]; ok {
|
||||
// move the node to front
|
||||
c.l.MoveToFront(node)
|
||||
// update the value of a list node
|
||||
node.Value.(*list.Element).Value = Pair{key: key, value: value}
|
||||
} else {
|
||||
// delete the last list node if the list is full
|
||||
if uint(c.l.Len()) == c.cap {
|
||||
// get the key that we want to delete
|
||||
idx := c.l.Back().Value.(*list.Element).Value.(Pair).key
|
||||
// delete the node pointer in the hash map by key
|
||||
delete(c.m, idx)
|
||||
// remove the last list node
|
||||
c.l.Remove(c.l.Back())
|
||||
}
|
||||
// initialize a list node
|
||||
node := &list.Element{
|
||||
Value: Pair{
|
||||
key: key,
|
||||
value: value,
|
||||
},
|
||||
}
|
||||
// push the new list node into the list
|
||||
ptr := c.l.PushFront(node)
|
||||
// save the node pointer in the hash map
|
||||
c.m[key] = ptr
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
package ifname
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
c := NewLRUCache(2)
|
||||
|
||||
c.Put("ones", nameMap{1: "one"})
|
||||
twoMap := nameMap{2: "two"}
|
||||
c.Put("twos", twoMap)
|
||||
c.Put("threes", nameMap{3: "three"})
|
||||
|
||||
_, ok := c.Get("ones")
|
||||
require.False(t, ok)
|
||||
|
||||
v, ok := c.Get("twos")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, twoMap, v)
|
||||
}
|
||||
|
|
@ -0,0 +1,370 @@
|
|||
package ifname
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/snmp"
|
||||
si "github.com/influxdata/telegraf/plugins/inputs/snmp"
|
||||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
"github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel"
|
||||
)
|
||||
|
||||
var sampleConfig = `
|
||||
## Name of tag holding the interface number
|
||||
# tag = "ifIndex"
|
||||
|
||||
## Name of output tag where service name will be added
|
||||
# dest = "ifName"
|
||||
|
||||
## Name of tag of the SNMP agent to request the interface name from
|
||||
# agent = "agent"
|
||||
|
||||
## Timeout for each request.
|
||||
# timeout = "5s"
|
||||
|
||||
## SNMP version; can be 1, 2, or 3.
|
||||
# version = 2
|
||||
|
||||
## SNMP community string.
|
||||
# community = "public"
|
||||
|
||||
## Number of retries to attempt.
|
||||
# retries = 3
|
||||
|
||||
## The GETBULK max-repetitions parameter.
|
||||
# max_repetitions = 10
|
||||
|
||||
## SNMPv3 authentication and encryption options.
|
||||
##
|
||||
## Security Name.
|
||||
# sec_name = "myuser"
|
||||
## Authentication protocol; one of "MD5", "SHA", or "".
|
||||
# auth_protocol = "MD5"
|
||||
## Authentication password.
|
||||
# auth_password = "pass"
|
||||
## Security Level; one of "noAuthNoPriv", "authNoPriv", or "authPriv".
|
||||
# sec_level = "authNoPriv"
|
||||
## Context Name.
|
||||
# context_name = ""
|
||||
## Privacy protocol used for encrypted messages; one of "DES", "AES" or "".
|
||||
# priv_protocol = ""
|
||||
## Privacy password used for encrypted messages.
|
||||
# priv_password = ""
|
||||
|
||||
## max_parallel_lookups is the maximum number of SNMP requests to
|
||||
## make at the same time.
|
||||
# max_parallel_lookups = 100
|
||||
|
||||
## ordered controls whether or not the metrics need to stay in the
|
||||
## same order this plugin received them in. If false, this plugin
|
||||
## may change the order when data is cached. If you need metrics to
|
||||
## stay in order set this to true. keeping the metrics ordered may
|
||||
## be slightly slower
|
||||
# ordered = false
|
||||
`
|
||||
|
||||
type nameMap map[uint64]string
|
||||
type mapFunc func(agent string) (nameMap, error)
|
||||
|
||||
type makeTableFunc func(string) (*si.Table, error)
|
||||
|
||||
type sigMap map[string](chan struct{})
|
||||
|
||||
type IfName struct {
|
||||
SourceTag string `toml:"tag"`
|
||||
DestTag string `toml:"dest"`
|
||||
AgentTag string `toml:"agent"`
|
||||
|
||||
snmp.ClientConfig
|
||||
|
||||
CacheSize uint `toml:"max_cache_entries"`
|
||||
MaxParallelLookups int `toml:"max_parallel_lookups"`
|
||||
Ordered bool `toml:"ordered"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
ifTable *si.Table `toml:"-"`
|
||||
ifXTable *si.Table `toml:"-"`
|
||||
|
||||
rwLock sync.RWMutex `toml:"-"`
|
||||
cache *LRUCache `toml:"-"`
|
||||
|
||||
parallel parallel.Parallel `toml:"-"`
|
||||
acc telegraf.Accumulator `toml:"-"`
|
||||
|
||||
getMapRemote mapFunc `toml:"-"`
|
||||
makeTable makeTableFunc `toml:"-"`
|
||||
|
||||
gsBase snmp.GosnmpWrapper `toml:"-"`
|
||||
|
||||
sigs sigMap `toml:"-"`
|
||||
sigsLock sync.Mutex `toml:"-"`
|
||||
}
|
||||
|
||||
func (d *IfName) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (d *IfName) Description() string {
|
||||
return "Add a tag of the network interface name looked up over SNMP by interface number"
|
||||
}
|
||||
|
||||
func (d *IfName) Init() error {
|
||||
d.getMapRemote = d.getMapRemoteNoMock
|
||||
d.makeTable = makeTableNoMock
|
||||
|
||||
c := NewLRUCache(d.CacheSize)
|
||||
d.cache = &c
|
||||
|
||||
d.sigs = make(sigMap)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *IfName) addTag(metric telegraf.Metric) error {
|
||||
agent, ok := metric.GetTag(d.AgentTag)
|
||||
if !ok {
|
||||
//agent tag missing
|
||||
return nil
|
||||
}
|
||||
|
||||
num_s, ok := metric.GetTag(d.SourceTag)
|
||||
if !ok {
|
||||
//source tag missing
|
||||
return nil
|
||||
}
|
||||
|
||||
num, err := strconv.ParseUint(num_s, 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't parse source tag as uint")
|
||||
}
|
||||
|
||||
m, err := d.getMap(agent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't retrieve the table of interface names: %w", err)
|
||||
}
|
||||
|
||||
name, found := m[num]
|
||||
if !found {
|
||||
return fmt.Errorf("interface number %d isn't in the table of interface names", num)
|
||||
}
|
||||
metric.AddTag(d.DestTag, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *IfName) Start(acc telegraf.Accumulator) error {
|
||||
d.acc = acc
|
||||
|
||||
var err error
|
||||
d.gsBase, err = snmp.NewWrapper(d.ClientConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing SNMP client config: %w", err)
|
||||
}
|
||||
|
||||
d.ifTable, err = d.makeTable("IF-MIB::ifTable")
|
||||
if err != nil {
|
||||
return fmt.Errorf("looking up ifTable in local MIB: %w", err)
|
||||
}
|
||||
d.ifXTable, err = d.makeTable("IF-MIB::ifXTable")
|
||||
if err != nil {
|
||||
return fmt.Errorf("looking up ifXTable in local MIB: %w", err)
|
||||
}
|
||||
|
||||
fn := func(m telegraf.Metric) []telegraf.Metric {
|
||||
err := d.addTag(m)
|
||||
if err != nil {
|
||||
d.Log.Debugf("Error adding tag %v", err)
|
||||
}
|
||||
return []telegraf.Metric{m}
|
||||
}
|
||||
|
||||
if d.Ordered {
|
||||
d.parallel = parallel.NewOrdered(acc, fn, 10000, d.MaxParallelLookups)
|
||||
} else {
|
||||
d.parallel = parallel.NewUnordered(acc, fn, d.MaxParallelLookups)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *IfName) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
|
||||
d.parallel.Enqueue(metric)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *IfName) Stop() error {
|
||||
d.parallel.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// getMap gets the interface names map either from cache or from the SNMP
|
||||
// agent
|
||||
func (d *IfName) getMap(agent string) (nameMap, error) {
|
||||
var sig chan struct{}
|
||||
|
||||
// Check cache
|
||||
d.rwLock.RLock()
|
||||
m, ok := d.cache.Get(agent)
|
||||
d.rwLock.RUnlock()
|
||||
if ok {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Is this the first request for this agent?
|
||||
d.sigsLock.Lock()
|
||||
sig, found := d.sigs[agent]
|
||||
if !found {
|
||||
s := make(chan struct{})
|
||||
d.sigs[agent] = s
|
||||
sig = s
|
||||
}
|
||||
d.sigsLock.Unlock()
|
||||
|
||||
if found {
|
||||
// This is not the first request. Wait for first to finish.
|
||||
<-sig
|
||||
// Check cache again
|
||||
d.rwLock.RLock()
|
||||
m, ok := d.cache.Get(agent)
|
||||
d.rwLock.RUnlock()
|
||||
if ok {
|
||||
return m, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("getting remote table from cache")
|
||||
}
|
||||
}
|
||||
|
||||
// The cache missed and this is the first request for this
|
||||
// agent.
|
||||
|
||||
// Make the SNMP request
|
||||
m, err := d.getMapRemote(agent)
|
||||
if err != nil {
|
||||
//failure. signal without saving to cache
|
||||
d.sigsLock.Lock()
|
||||
close(sig)
|
||||
delete(d.sigs, agent)
|
||||
d.sigsLock.Unlock()
|
||||
|
||||
return nil, fmt.Errorf("getting remote table: %w", err)
|
||||
}
|
||||
|
||||
// Cache it
|
||||
d.rwLock.Lock()
|
||||
d.cache.Put(agent, m)
|
||||
d.rwLock.Unlock()
|
||||
|
||||
// Signal any other waiting requests for this agent and clean up
|
||||
d.sigsLock.Lock()
|
||||
close(sig)
|
||||
delete(d.sigs, agent)
|
||||
d.sigsLock.Unlock()
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (d *IfName) getMapRemoteNoMock(agent string) (nameMap, error) {
|
||||
gs := d.gsBase
|
||||
err := gs.SetAgent(agent)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing agent tag: %w", err)
|
||||
}
|
||||
|
||||
err = gs.Connect()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting when fetching interface names: %w", err)
|
||||
}
|
||||
|
||||
//try ifXtable and ifName first. if that fails, fall back to
|
||||
//ifTable and ifDescr
|
||||
var m nameMap
|
||||
m, err = buildMap(gs, d.ifXTable, "ifName")
|
||||
if err == nil {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
m, err = buildMap(gs, d.ifTable, "ifDescr")
|
||||
if err == nil {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("fetching interface names: %w", err)
|
||||
}
|
||||
|
||||
func init() {
|
||||
processors.AddStreaming("ifname", func() telegraf.StreamingProcessor {
|
||||
return &IfName{
|
||||
SourceTag: "ifIndex",
|
||||
DestTag: "ifName",
|
||||
AgentTag: "agent",
|
||||
CacheSize: 100,
|
||||
MaxParallelLookups: 100,
|
||||
ClientConfig: snmp.ClientConfig{
|
||||
Retries: 3,
|
||||
MaxRepetitions: 10,
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second},
|
||||
Version: 2,
|
||||
Community: "public",
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func makeTableNoMock(tableName string) (*si.Table, error) {
|
||||
var err error
|
||||
tab := si.Table{
|
||||
Oid: tableName,
|
||||
IndexAsTag: true,
|
||||
}
|
||||
|
||||
err = tab.Init()
|
||||
if err != nil {
|
||||
//Init already wraps
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &tab, nil
|
||||
}
|
||||
|
||||
func buildMap(gs snmp.GosnmpWrapper, tab *si.Table, column string) (nameMap, error) {
|
||||
var err error
|
||||
|
||||
rtab, err := tab.Build(gs, true)
|
||||
if err != nil {
|
||||
//Build already wraps
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(rtab.Rows) == 0 {
|
||||
return nil, fmt.Errorf("empty table")
|
||||
}
|
||||
|
||||
t := make(nameMap)
|
||||
for _, v := range rtab.Rows {
|
||||
i_str, ok := v.Tags["index"]
|
||||
if !ok {
|
||||
//should always have an index tag because the table should
|
||||
//always have IndexAsTag true
|
||||
return nil, fmt.Errorf("no index tag")
|
||||
}
|
||||
i, err := strconv.ParseUint(i_str, 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("index tag isn't a uint")
|
||||
}
|
||||
name_if, ok := v.Fields[column]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field %s is missing", column)
|
||||
}
|
||||
name, ok := name_if.(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("field %s isn't a string", column)
|
||||
}
|
||||
|
||||
t[i] = name
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,155 @@
|
|||
package ifname
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/snmp"
|
||||
si "github.com/influxdata/telegraf/plugins/inputs/snmp"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTable(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
d := IfName{}
|
||||
d.Init()
|
||||
tab, err := d.makeTable("IF-MIB::ifTable")
|
||||
require.NoError(t, err)
|
||||
|
||||
config := snmp.ClientConfig{
|
||||
Version: 2,
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout
|
||||
}
|
||||
gs, err := snmp.NewWrapper(config)
|
||||
require.NoError(t, err)
|
||||
err = gs.SetAgent("127.0.0.1")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = gs.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
//could use ifIndex but oid index is always the same
|
||||
m, err := buildMap(gs, tab, "ifDescr")
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, m)
|
||||
}
|
||||
|
||||
func TestIfName(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
d := IfName{
|
||||
SourceTag: "ifIndex",
|
||||
DestTag: "ifName",
|
||||
AgentTag: "agent",
|
||||
CacheSize: 1000,
|
||||
ClientConfig: snmp.ClientConfig{
|
||||
Version: 2,
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout
|
||||
},
|
||||
}
|
||||
err := d.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
acc := testutil.Accumulator{}
|
||||
err = d.Start(&acc)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
m := testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"ifIndex": "1",
|
||||
"agent": "127.0.0.1",
|
||||
},
|
||||
map[string]interface{}{},
|
||||
time.Unix(0, 0),
|
||||
)
|
||||
|
||||
expected := testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{
|
||||
"ifIndex": "1",
|
||||
"agent": "127.0.0.1",
|
||||
"ifName": "lo",
|
||||
},
|
||||
map[string]interface{}{},
|
||||
time.Unix(0, 0),
|
||||
)
|
||||
|
||||
err = d.addTag(m)
|
||||
require.NoError(t, err)
|
||||
|
||||
testutil.RequireMetricEqual(t, expected, m)
|
||||
}
|
||||
|
||||
func TestGetMap(t *testing.T) {
|
||||
d := IfName{
|
||||
SourceTag: "ifIndex",
|
||||
DestTag: "ifName",
|
||||
AgentTag: "agent",
|
||||
CacheSize: 1000,
|
||||
ClientConfig: snmp.ClientConfig{
|
||||
Version: 2,
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout
|
||||
},
|
||||
}
|
||||
|
||||
// This test mocks the snmp transaction so don't run net-snmp
|
||||
// commands to look up table names.
|
||||
d.makeTable = func(agent string) (*si.Table, error) {
|
||||
return &si.Table{}, nil
|
||||
}
|
||||
err := d.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Request the same agent multiple times in goroutines. The first
|
||||
// request should make the mocked remote call and the others
|
||||
// should block until the response is cached, then return the
|
||||
// cached response.
|
||||
|
||||
expected := nameMap{
|
||||
1: "ifname1",
|
||||
2: "ifname2",
|
||||
}
|
||||
|
||||
var wgRemote sync.WaitGroup
|
||||
var remoteCalls int32
|
||||
|
||||
wgRemote.Add(1)
|
||||
d.getMapRemote = func(agent string) (nameMap, error) {
|
||||
atomic.AddInt32(&remoteCalls, 1)
|
||||
wgRemote.Wait() //don't return until all requests are made
|
||||
return expected, nil
|
||||
}
|
||||
|
||||
const thMax = 3
|
||||
var wgReq sync.WaitGroup
|
||||
|
||||
for th := 0; th < thMax; th++ {
|
||||
wgReq.Add(1)
|
||||
go func() {
|
||||
defer wgReq.Done()
|
||||
m, err := d.getMap("agent")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, m)
|
||||
}()
|
||||
}
|
||||
|
||||
//signal mocked remote call to finish
|
||||
wgRemote.Done()
|
||||
|
||||
//wait for requests to finish
|
||||
wgReq.Wait()
|
||||
|
||||
//remote call should only happen once
|
||||
require.Equal(t, int32(1), remoteCalls)
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue