diff --git a/internal/snmp/config.go b/internal/snmp/config.go new file mode 100644 index 000000000..e616e7570 --- /dev/null +++ b/internal/snmp/config.go @@ -0,0 +1,34 @@ +package snmp + +import ( + "github.com/influxdata/telegraf/internal" +) + +type ClientConfig struct { + // Timeout to wait for a response. + Timeout internal.Duration `toml:"timeout"` + Retries int `toml:"retries"` + // Values: 1, 2, 3 + Version uint8 `toml:"version"` + + // Parameters for Version 1 & 2 + Community string `toml:"community"` + + // Parameters for Version 2 & 3 + MaxRepetitions uint8 `toml:"max_repetitions"` + + // Parameters for Version 3 + ContextName string `toml:"context_name"` + // Values: "noAuthNoPriv", "authNoPriv", "authPriv" + SecLevel string `toml:"sec_level"` + SecName string `toml:"sec_name"` + // Values: "MD5", "SHA", "". Default: "" + AuthProtocol string `toml:"auth_protocol"` + AuthPassword string `toml:"auth_password"` + // Values: "DES", "AES", "". Default: "" + PrivProtocol string `toml:"priv_protocol"` + PrivPassword string `toml:"priv_password"` + EngineID string `toml:"-"` + EngineBoots uint32 `toml:"-"` + EngineTime uint32 `toml:"-"` +} diff --git a/internal/snmp/wrapper.go b/internal/snmp/wrapper.go new file mode 100644 index 000000000..23a15594e --- /dev/null +++ b/internal/snmp/wrapper.go @@ -0,0 +1,180 @@ +package snmp + +import ( + "fmt" + "net/url" + "strconv" + "strings" + + "github.com/soniah/gosnmp" +) + +// GosnmpWrapper wraps a *gosnmp.GoSNMP object so we can use it as a snmpConnection. +type GosnmpWrapper struct { + *gosnmp.GoSNMP +} + +// Host returns the value of GoSNMP.Target. +func (gsw GosnmpWrapper) Host() string { + return gsw.Target +} + +// Walk wraps GoSNMP.Walk() or GoSNMP.BulkWalk(), depending on whether the +// connection is using SNMPv1 or newer. +// Also, if any error is encountered, it will just once reconnect and try again. +func (gsw GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error { + var err error + // On error, retry once. + // Unfortunately we can't distinguish between an error returned by gosnmp, and one returned by the walk function. + for i := 0; i < 2; i++ { + if gsw.Version == gosnmp.Version1 { + err = gsw.GoSNMP.Walk(oid, fn) + } else { + err = gsw.GoSNMP.BulkWalk(oid, fn) + } + if err == nil { + return nil + } + if err := gsw.GoSNMP.Connect(); err != nil { + return fmt.Errorf("reconnecting: %w", err) + } + } + return err +} + +// Get wraps GoSNMP.GET(). +// If any error is encountered, it will just once reconnect and try again. +func (gsw GosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) { + var err error + var pkt *gosnmp.SnmpPacket + for i := 0; i < 2; i++ { + pkt, err = gsw.GoSNMP.Get(oids) + if err == nil { + return pkt, nil + } + if err := gsw.GoSNMP.Connect(); err != nil { + return nil, fmt.Errorf("reconnecting: %w", err) + } + } + return nil, err +} + +func NewWrapper(s ClientConfig) (GosnmpWrapper, error) { + gs := GosnmpWrapper{&gosnmp.GoSNMP{}} + + gs.Timeout = s.Timeout.Duration + + gs.Retries = s.Retries + + switch s.Version { + case 3: + gs.Version = gosnmp.Version3 + case 2, 0: + gs.Version = gosnmp.Version2c + case 1: + gs.Version = gosnmp.Version1 + default: + return GosnmpWrapper{}, fmt.Errorf("invalid version") + } + + if s.Version < 3 { + if s.Community == "" { + gs.Community = "public" + } else { + gs.Community = s.Community + } + } + + gs.MaxRepetitions = s.MaxRepetitions + + if s.Version == 3 { + gs.ContextName = s.ContextName + + sp := &gosnmp.UsmSecurityParameters{} + gs.SecurityParameters = sp + gs.SecurityModel = gosnmp.UserSecurityModel + + switch strings.ToLower(s.SecLevel) { + case "noauthnopriv", "": + gs.MsgFlags = gosnmp.NoAuthNoPriv + case "authnopriv": + gs.MsgFlags = gosnmp.AuthNoPriv + case "authpriv": + gs.MsgFlags = gosnmp.AuthPriv + default: + return GosnmpWrapper{}, fmt.Errorf("invalid secLevel") + } + + sp.UserName = s.SecName + + switch strings.ToLower(s.AuthProtocol) { + case "md5": + sp.AuthenticationProtocol = gosnmp.MD5 + case "sha": + sp.AuthenticationProtocol = gosnmp.SHA + case "": + sp.AuthenticationProtocol = gosnmp.NoAuth + default: + return GosnmpWrapper{}, fmt.Errorf("invalid authProtocol") + } + + sp.AuthenticationPassphrase = s.AuthPassword + + switch strings.ToLower(s.PrivProtocol) { + case "des": + sp.PrivacyProtocol = gosnmp.DES + case "aes": + sp.PrivacyProtocol = gosnmp.AES + case "": + sp.PrivacyProtocol = gosnmp.NoPriv + default: + return GosnmpWrapper{}, fmt.Errorf("invalid privProtocol") + } + + sp.PrivacyPassphrase = s.PrivPassword + + sp.AuthoritativeEngineID = s.EngineID + + sp.AuthoritativeEngineBoots = s.EngineBoots + + sp.AuthoritativeEngineTime = s.EngineTime + } + return gs, nil +} + +// SetAgent takes a url (scheme://host:port) and sets the wrapped +// GoSNMP struct's corresponding fields. This shouldn't be called +// after using the wrapped GoSNMP struct, for example after +// connecting. +func (gs *GosnmpWrapper) SetAgent(agent string) error { + if !strings.Contains(agent, "://") { + agent = "udp://" + agent + } + + u, err := url.Parse(agent) + if err != nil { + return err + } + + switch u.Scheme { + case "tcp": + gs.Transport = "tcp" + case "", "udp": + gs.Transport = "udp" + default: + return fmt.Errorf("unsupported scheme: %v", u.Scheme) + } + + gs.Target = u.Hostname() + + portStr := u.Port() + if portStr == "" { + portStr = "161" + } + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + return fmt.Errorf("parsing port: %w", err) + } + gs.Port = uint16(port) + return nil +} diff --git a/plugins/inputs/snmp/snmp.go b/plugins/inputs/snmp/snmp.go index 57f29bfb0..737be06f6 100644 --- a/plugins/inputs/snmp/snmp.go +++ b/plugins/inputs/snmp/snmp.go @@ -7,7 +7,6 @@ import ( "log" "math" "net" - "net/url" "os/exec" "strconv" "strings" @@ -16,6 +15,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/snmp" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/wlog" "github.com/soniah/gosnmp" @@ -82,10 +82,7 @@ func execCmd(arg0 string, args ...string) ([]byte, error) { out, err := execCommand(arg0, args...).Output() if err != nil { if err, ok := err.(*exec.ExitError); ok { - return nil, NestedError{ - Err: err, - NestedErr: fmt.Errorf("%s", bytes.TrimRight(err.Stderr, "\r\n")), - } + return nil, fmt.Errorf("%s: %w", bytes.TrimRight(err.Stderr, "\r\n"), err) } return nil, err } @@ -97,32 +94,8 @@ type Snmp struct { // The SNMP agent to query. Format is [SCHEME://]ADDR[:PORT] (e.g. // udp://1.2.3.4:161). If the scheme is not specified then "udp" is used. Agents []string `toml:"agents"` - // Timeout to wait for a response. - Timeout internal.Duration `toml:"timeout"` - Retries int `toml:"retries"` - // Values: 1, 2, 3 - Version uint8 `toml:"version"` - // Parameters for Version 1 & 2 - Community string `toml:"community"` - - // Parameters for Version 2 & 3 - MaxRepetitions uint8 `toml:"max_repetitions"` - - // Parameters for Version 3 - ContextName string `toml:"context_name"` - // Values: "noAuthNoPriv", "authNoPriv", "authPriv" - SecLevel string `toml:"sec_level"` - SecName string `toml:"sec_name"` - // Values: "MD5", "SHA", "". Default: "" - AuthProtocol string `toml:"auth_protocol"` - AuthPassword string `toml:"auth_password"` - // Values: "DES", "AES", "". Default: "" - PrivProtocol string `toml:"priv_protocol"` - PrivPassword string `toml:"priv_password"` - EngineID string `toml:"-"` - EngineBoots uint32 `toml:"-"` - EngineTime uint32 `toml:"-"` + snmp.ClientConfig Tables []Table `toml:"table"` @@ -144,14 +117,14 @@ func (s *Snmp) init() error { s.connectionCache = make([]snmpConnection, len(s.Agents)) for i := range s.Tables { - if err := s.Tables[i].init(); err != nil { - return Errorf(err, "initializing table %s", s.Tables[i].Name) + if err := s.Tables[i].Init(); err != nil { + return fmt.Errorf("initializing table %s: %w", s.Tables[i].Name, err) } } for i := range s.Fields { if err := s.Fields[i].init(); err != nil { - return Errorf(err, "initializing field %s", s.Fields[i].Name) + return fmt.Errorf("initializing field %s: %w", s.Fields[i].Name, err) } } @@ -181,8 +154,8 @@ type Table struct { initialized bool } -// init() builds & initializes the nested fields. -func (t *Table) init() error { +// Init() builds & initializes the nested fields. +func (t *Table) Init() error { if t.initialized { return nil } @@ -194,7 +167,7 @@ func (t *Table) init() error { // initialize all the nested fields for i := range t.Fields { if err := t.Fields[i].init(); err != nil { - return Errorf(err, "initializing field %s", t.Fields[i].Name) + return fmt.Errorf("initializing field %s: %w", t.Fields[i].Name, err) } } @@ -266,7 +239,7 @@ func (f *Field) init() error { _, oidNum, oidText, conversion, err := SnmpTranslate(f.Oid) if err != nil { - return Errorf(err, "translating") + return fmt.Errorf("translating: %w", err) } f.Oid = oidNum if f.Name == "" { @@ -301,36 +274,30 @@ type RTableRow struct { Fields map[string]interface{} } -// NestedError wraps an error returned from deeper in the code. -type NestedError struct { - // Err is the error from where the NestedError was constructed. - Err error - // NestedError is the error that was passed back from the called function. - NestedErr error +type walkError struct { + msg string + err error } -// Error returns a concatenated string of all the nested errors. -func (ne NestedError) Error() string { - return ne.Err.Error() + ": " + ne.NestedErr.Error() +func (e *walkError) Error() string { + return e.msg } -// Errorf is a convenience function for constructing a NestedError. -func Errorf(err error, msg string, format ...interface{}) error { - return NestedError{ - NestedErr: err, - Err: fmt.Errorf(msg, format...), - } +func (e *walkError) Unwrap() error { + return e.err } func init() { inputs.Add("snmp", func() telegraf.Input { return &Snmp{ - Name: "snmp", - Retries: 3, - MaxRepetitions: 10, - Timeout: internal.Duration{Duration: 5 * time.Second}, - Version: 2, - Community: "public", + Name: "snmp", + ClientConfig: snmp.ClientConfig{ + Retries: 3, + MaxRepetitions: 10, + Timeout: internal.Duration{Duration: 5 * time.Second}, + Version: 2, + Community: "public", + }, } }) } @@ -360,7 +327,7 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error { defer wg.Done() gs, err := s.getConnection(i) if err != nil { - acc.AddError(Errorf(err, "agent %s", agent)) + acc.AddError(fmt.Errorf("agent %s: %w", agent, err)) return } @@ -371,13 +338,13 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error { } topTags := map[string]string{} if err := s.gatherTable(acc, gs, t, topTags, false); err != nil { - acc.AddError(Errorf(err, "agent %s", agent)) + acc.AddError(fmt.Errorf("agent %s: %w", agent, err)) } // Now is the real tables. for _, t := range s.Tables { if err := s.gatherTable(acc, gs, t, topTags, true); err != nil { - acc.AddError(Errorf(err, "agent %s: gathering table %s", agent, t.Name)) + acc.AddError(fmt.Errorf("agent %s: gathering table %s: %w", agent, t.Name, err)) } } }(i, agent) @@ -447,19 +414,19 @@ func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) { // empty string. This results in all the non-table fields sharing the same // index, and being added on the same row. if pkt, err := gs.Get([]string{oid}); err != nil { - return nil, Errorf(err, "performing get on field %s", f.Name) + return nil, fmt.Errorf("performing get on field %s: %w", f.Name, err) } else if pkt != nil && len(pkt.Variables) > 0 && pkt.Variables[0].Type != gosnmp.NoSuchObject && pkt.Variables[0].Type != gosnmp.NoSuchInstance { ent := pkt.Variables[0] fv, err := fieldConvert(f.Conversion, ent.Value) if err != nil { - return nil, Errorf(err, "converting %q (OID %s) for field %s", ent.Value, ent.Name, f.Name) + return nil, fmt.Errorf("converting %q (OID %s) for field %s: %w", ent.Value, ent.Name, f.Name, err) } ifv[""] = fv } } else { err := gs.Walk(oid, func(ent gosnmp.SnmpPDU) error { if len(ent.Name) <= len(oid) || ent.Name[:len(oid)+1] != oid+"." { - return NestedError{} // break the walk + return &walkError{} // break the walk } idx := ent.Name[len(oid):] @@ -485,14 +452,20 @@ func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) { fv, err := fieldConvert(f.Conversion, ent.Value) if err != nil { - return Errorf(err, "converting %q (OID %s) for field %s", ent.Value, ent.Name, f.Name) + return &walkError{ + msg: fmt.Sprintf("converting %q (OID %s) for field %s", ent.Value, ent.Name, f.Name), + err: err, + } } ifv[idx] = fv return nil }) if err != nil { - if _, ok := err.(NestedError); !ok { - return nil, Errorf(err, "performing bulk walk for field %s", f.Name) + // Our callback always wraps errors in a walkError. + // If this error isn't a walkError, we know it's not + // from the callback + if _, ok := err.(*walkError); !ok { + return nil, fmt.Errorf("performing bulk walk for field %s: %w", f.Name, err) } } } @@ -546,56 +519,6 @@ type snmpConnection interface { Get(oids []string) (*gosnmp.SnmpPacket, error) } -// gosnmpWrapper wraps a *gosnmp.GoSNMP object so we can use it as a snmpConnection. -type gosnmpWrapper struct { - *gosnmp.GoSNMP -} - -// Host returns the value of GoSNMP.Target. -func (gsw gosnmpWrapper) Host() string { - return gsw.Target -} - -// Walk wraps GoSNMP.Walk() or GoSNMP.BulkWalk(), depending on whether the -// connection is using SNMPv1 or newer. -// Also, if any error is encountered, it will just once reconnect and try again. -func (gsw gosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error { - var err error - // On error, retry once. - // Unfortunately we can't distinguish between an error returned by gosnmp, and one returned by the walk function. - for i := 0; i < 2; i++ { - if gsw.Version == gosnmp.Version1 { - err = gsw.GoSNMP.Walk(oid, fn) - } else { - err = gsw.GoSNMP.BulkWalk(oid, fn) - } - if err == nil { - return nil - } - if err := gsw.GoSNMP.Connect(); err != nil { - return Errorf(err, "reconnecting") - } - } - return err -} - -// Get wraps GoSNMP.GET(). -// If any error is encountered, it will just once reconnect and try again. -func (gsw gosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) { - var err error - var pkt *gosnmp.SnmpPacket - for i := 0; i < 2; i++ { - pkt, err = gsw.GoSNMP.Get(oids) - if err == nil { - return pkt, nil - } - if err := gsw.GoSNMP.Connect(); err != nil { - return nil, Errorf(err, "reconnecting") - } - } - return nil, err -} - // getConnection creates a snmpConnection (*gosnmp.GoSNMP) object and caches the // result using `agentIndex` as the cache key. This is done to allow multiple // connections to a single address. It is an error to use a connection in @@ -607,119 +530,21 @@ func (s *Snmp) getConnection(idx int) (snmpConnection, error) { agent := s.Agents[idx] - gs := gosnmpWrapper{&gosnmp.GoSNMP{}} - s.connectionCache[idx] = gs - - if !strings.Contains(agent, "://") { - agent = "udp://" + agent + var err error + var gs snmp.GosnmpWrapper + gs, err = snmp.NewWrapper(s.ClientConfig) + if err != nil { + return nil, err } - - u, err := url.Parse(agent) + gs.SetAgent(agent) if err != nil { return nil, err } - switch u.Scheme { - case "tcp": - gs.Transport = "tcp" - case "", "udp": - gs.Transport = "udp" - default: - return nil, fmt.Errorf("unsupported scheme: %v", u.Scheme) - } - - gs.Target = u.Hostname() - - portStr := u.Port() - if portStr == "" { - portStr = "161" - } - port, err := strconv.ParseUint(portStr, 10, 16) - if err != nil { - return nil, Errorf(err, "parsing port") - } - gs.Port = uint16(port) - - gs.Timeout = s.Timeout.Duration - - gs.Retries = s.Retries - - switch s.Version { - case 3: - gs.Version = gosnmp.Version3 - case 2, 0: - gs.Version = gosnmp.Version2c - case 1: - gs.Version = gosnmp.Version1 - default: - return nil, fmt.Errorf("invalid version") - } - - if s.Version < 3 { - if s.Community == "" { - gs.Community = "public" - } else { - gs.Community = s.Community - } - } - - gs.MaxRepetitions = s.MaxRepetitions - - if s.Version == 3 { - gs.ContextName = s.ContextName - - sp := &gosnmp.UsmSecurityParameters{} - gs.SecurityParameters = sp - gs.SecurityModel = gosnmp.UserSecurityModel - - switch strings.ToLower(s.SecLevel) { - case "noauthnopriv", "": - gs.MsgFlags = gosnmp.NoAuthNoPriv - case "authnopriv": - gs.MsgFlags = gosnmp.AuthNoPriv - case "authpriv": - gs.MsgFlags = gosnmp.AuthPriv - default: - return nil, fmt.Errorf("invalid secLevel") - } - - sp.UserName = s.SecName - - switch strings.ToLower(s.AuthProtocol) { - case "md5": - sp.AuthenticationProtocol = gosnmp.MD5 - case "sha": - sp.AuthenticationProtocol = gosnmp.SHA - case "": - sp.AuthenticationProtocol = gosnmp.NoAuth - default: - return nil, fmt.Errorf("invalid authProtocol") - } - - sp.AuthenticationPassphrase = s.AuthPassword - - switch strings.ToLower(s.PrivProtocol) { - case "des": - sp.PrivacyProtocol = gosnmp.DES - case "aes": - sp.PrivacyProtocol = gosnmp.AES - case "": - sp.PrivacyProtocol = gosnmp.NoPriv - default: - return nil, fmt.Errorf("invalid privProtocol") - } - - sp.PrivacyPassphrase = s.PrivPassword - - sp.AuthoritativeEngineID = s.EngineID - - sp.AuthoritativeEngineBoots = s.EngineBoots - - sp.AuthoritativeEngineTime = s.EngineTime - } + s.connectionCache[idx] = gs if err := gs.Connect(); err != nil { - return nil, Errorf(err, "setting up connection") + return nil, fmt.Errorf("setting up connection: %w", err) } return gs, nil @@ -881,7 +706,7 @@ func snmpTable(oid string) (mibName string, oidNum string, oidText string, field func snmpTableCall(oid string) (mibName string, oidNum string, oidText string, fields []Field, err error) { mibName, oidNum, oidText, _, err = SnmpTranslate(oid) if err != nil { - return "", "", "", nil, Errorf(err, "translating") + return "", "", "", nil, fmt.Errorf("translating: %w", err) } mibPrefix := mibName + "::" @@ -918,7 +743,7 @@ func snmpTableCall(oid string) (mibName string, oidNum string, oidText string, f // this won't actually try to run a query. The `-Ch` will just cause it to dump headers. out, err := execCmd("snmptable", "-Ch", "-Cl", "-c", "public", "127.0.0.1", oidFullName) if err != nil { - return "", "", "", nil, Errorf(err, "getting table columns") + return "", "", "", nil, fmt.Errorf("getting table columns: %w", err) } scanner := bufio.NewScanner(bytes.NewBuffer(out)) scanner.Scan() @@ -1016,7 +841,7 @@ func snmpTranslateCall(oid string) (mibName string, oidNum string, oidText strin scanner := bufio.NewScanner(bytes.NewBuffer(out)) ok := scanner.Scan() if !ok && scanner.Err() != nil { - return "", "", "", "", Errorf(scanner.Err(), "getting OID text") + return "", "", "", "", fmt.Errorf("getting OID text: %w", scanner.Err()) } oidText = scanner.Text() diff --git a/plugins/inputs/snmp/snmp_test.go b/plugins/inputs/snmp/snmp_test.go index d29b525ad..9991ff741 100644 --- a/plugins/inputs/snmp/snmp_test.go +++ b/plugins/inputs/snmp/snmp_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/snmp" + config "github.com/influxdata/telegraf/internal/snmp" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/testutil" "github.com/influxdata/toml" @@ -88,13 +90,15 @@ func TestSampleConfig(t *testing.T) { require.NoError(t, err) expected := &Snmp{ - Agents: []string{"udp://127.0.0.1:161"}, - Timeout: internal.Duration{Duration: 5 * time.Second}, - Version: 2, - Community: "public", - MaxRepetitions: 10, - Retries: 3, - Name: "snmp", + Agents: []string{"udp://127.0.0.1:161"}, + ClientConfig: config.ClientConfig{ + Timeout: internal.Duration{Duration: 5 * time.Second}, + Version: 2, + Community: "public", + MaxRepetitions: 10, + Retries: 3, + }, + Name: "snmp", } require.Equal(t, expected, conf) } @@ -141,7 +145,7 @@ func TestTableInit(t *testing.T) { {Oid: "TEST::description", Name: "description", IsTag: true}, }, } - err := tbl.init() + err := tbl.Init() require.NoError(t, err) assert.Equal(t, "testTable", tbl.Name) @@ -232,18 +236,20 @@ func TestSnmpInit_noTranslate(t *testing.T) { func TestGetSNMPConnection_v2(t *testing.T) { s := &Snmp{ - Agents: []string{"1.2.3.4:567", "1.2.3.4", "udp://127.0.0.1"}, - Timeout: internal.Duration{Duration: 3 * time.Second}, - Retries: 4, - Version: 2, - Community: "foo", + Agents: []string{"1.2.3.4:567", "1.2.3.4", "udp://127.0.0.1"}, + ClientConfig: config.ClientConfig{ + Timeout: internal.Duration{Duration: 3 * time.Second}, + Retries: 4, + Version: 2, + Community: "foo", + }, } err := s.init() require.NoError(t, err) gsc, err := s.getConnection(0) require.NoError(t, err) - gs := gsc.(gosnmpWrapper) + gs := gsc.(snmp.GosnmpWrapper) assert.Equal(t, "1.2.3.4", gs.Target) assert.EqualValues(t, 567, gs.Port) assert.Equal(t, gosnmp.Version2c, gs.Version) @@ -252,14 +258,14 @@ func TestGetSNMPConnection_v2(t *testing.T) { gsc, err = s.getConnection(1) require.NoError(t, err) - gs = gsc.(gosnmpWrapper) + gs = gsc.(snmp.GosnmpWrapper) assert.Equal(t, "1.2.3.4", gs.Target) assert.EqualValues(t, 161, gs.Port) assert.Equal(t, "udp", gs.Transport) gsc, err = s.getConnection(2) require.NoError(t, err) - gs = gsc.(gosnmpWrapper) + gs = gsc.(snmp.GosnmpWrapper) assert.Equal(t, "127.0.0.1", gs.Target) assert.EqualValues(t, 161, gs.Port) assert.Equal(t, "udp", gs.Transport) @@ -280,7 +286,7 @@ func TestGetSNMPConnectionTCP(t *testing.T) { wg.Add(1) gsc, err := s.getConnection(0) require.NoError(t, err) - gs := gsc.(gosnmpWrapper) + gs := gsc.(snmp.GosnmpWrapper) assert.Equal(t, "127.0.0.1", gs.Target) assert.EqualValues(t, 56789, gs.Port) assert.Equal(t, "tcp", gs.Transport) @@ -299,26 +305,28 @@ func stubTCPServer(wg *sync.WaitGroup) { func TestGetSNMPConnection_v3(t *testing.T) { s := &Snmp{ - Agents: []string{"1.2.3.4"}, - Version: 3, - MaxRepetitions: 20, - ContextName: "mycontext", - SecLevel: "authPriv", - SecName: "myuser", - AuthProtocol: "md5", - AuthPassword: "password123", - PrivProtocol: "des", - PrivPassword: "321drowssap", - EngineID: "myengineid", - EngineBoots: 1, - EngineTime: 2, + Agents: []string{"1.2.3.4"}, + ClientConfig: config.ClientConfig{ + Version: 3, + MaxRepetitions: 20, + ContextName: "mycontext", + SecLevel: "authPriv", + SecName: "myuser", + AuthProtocol: "md5", + AuthPassword: "password123", + PrivProtocol: "des", + PrivPassword: "321drowssap", + EngineID: "myengineid", + EngineBoots: 1, + EngineTime: 2, + }, } err := s.init() require.NoError(t, err) gsc, err := s.getConnection(0) require.NoError(t, err) - gs := gsc.(gosnmpWrapper) + gs := gsc.(snmp.GosnmpWrapper) assert.Equal(t, gs.Version, gosnmp.Version3) sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters) assert.Equal(t, "1.2.3.4", gsc.Host()) @@ -394,7 +402,9 @@ func TestGosnmpWrapper_walk_retry(t *testing.T) { require.NoError(t, err) conn := gs.Conn - gsw := gosnmpWrapper{gs} + gsw := snmp.GosnmpWrapper{ + GoSNMP: gs, + } err = gsw.Walk(".1.0.0", func(_ gosnmp.SnmpPDU) error { return nil }) srvr.Close() wg.Wait() @@ -442,7 +452,9 @@ func TestGosnmpWrapper_get_retry(t *testing.T) { require.NoError(t, err) conn := gs.Conn - gsw := gosnmpWrapper{gs} + gsw := snmp.GosnmpWrapper{ + GoSNMP: gs, + } _, err = gsw.Get([]string{".1.0.0"}) srvr.Close() wg.Wait() @@ -788,16 +800,3 @@ func TestSnmpTableCache_hit(t *testing.T) { assert.Equal(t, []Field{{Name: "d"}}, fields) assert.Equal(t, fmt.Errorf("e"), err) } - -func TestError(t *testing.T) { - e := fmt.Errorf("nested error") - err := Errorf(e, "top error %d", 123) - require.Error(t, err) - - ne, ok := err.(NestedError) - require.True(t, ok) - assert.Equal(t, e, ne.NestedErr) - - assert.Contains(t, err.Error(), "top error 123") - assert.Contains(t, err.Error(), "nested error") -} diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 307fe540b..c84ee8111 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -9,6 +9,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/enum" _ "github.com/influxdata/telegraf/plugins/processors/execd" _ "github.com/influxdata/telegraf/plugins/processors/filepath" + _ "github.com/influxdata/telegraf/plugins/processors/ifname" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/pivot" diff --git a/plugins/processors/ifname/README.md b/plugins/processors/ifname/README.md new file mode 100644 index 000000000..1335148d3 --- /dev/null +++ b/plugins/processors/ifname/README.md @@ -0,0 +1,75 @@ +# Network Interface Name Processor Plugin + +The `ifname` plugin looks up network interface names using SNMP. + +### Configuration: + +```toml +[[processors.ifname]] + ## Name of tag holding the interface number + # tag = "ifIndex" + + ## Name of output tag where service name will be added + # dest = "ifName" + + ## Name of tag of the SNMP agent to request the interface name from + # agent = "agent" + + ## Timeout for each request. + # timeout = "5s" + + ## SNMP version; can be 1, 2, or 3. + # version = 2 + + ## SNMP community string. + # community = "public" + + ## Number of retries to attempt. + # retries = 3 + + ## The GETBULK max-repetitions parameter. + # max_repetitions = 10 + + ## SNMPv3 authentication and encryption options. + ## + ## Security Name. + # sec_name = "myuser" + ## Authentication protocol; one of "MD5", "SHA", or "". + # auth_protocol = "MD5" + ## Authentication password. + # auth_password = "pass" + ## Security Level; one of "noAuthNoPriv", "authNoPriv", or "authPriv". + # sec_level = "authNoPriv" + ## Context Name. + # context_name = "" + ## Privacy protocol used for encrypted messages; one of "DES", "AES" or "". + # priv_protocol = "" + ## Privacy password used for encrypted messages. + # priv_password = "" + + ## max_parallel_lookups is the maximum number of SNMP requests to + ## make at the same time. + # max_parallel_lookups = 100 + + ## ordered controls whether or not the metrics need to stay in the + ## same order this plugin received them in. If false, this plugin + ## may change the order when data is cached. If you need metrics to + ## stay in order set this to true. keeping the metrics ordered may + ## be slightly slower + # ordered = false +``` + +### Example processing: + +Example config: + +```toml +[[processors.ifname]] + tag = "ifIndex" + dest = "ifName" +``` + +```diff +- foo,ifIndex=2,agent=127.0.0.1 field=123 1502489900000000000 ++ foo,ifIndex=2,agent=127.0.0.1,ifName=eth0 field=123 1502489900000000000 +``` diff --git a/plugins/processors/ifname/cache.go b/plugins/processors/ifname/cache.go new file mode 100644 index 000000000..c3232f531 --- /dev/null +++ b/plugins/processors/ifname/cache.go @@ -0,0 +1,78 @@ +package ifname + +// See https://girai.dev/blog/lru-cache-implementation-in-go/ + +import ( + "container/list" +) + +// Type aliases let the implementation be more generic +type keyType = string +type valType = nameMap + +type hashType map[keyType]*list.Element + +type LRUCache struct { + cap uint // capacity + l *list.List // doubly linked list + m hashType // hash table for checking if list node exists +} + +// Pair is the value of a list node. +type Pair struct { + key keyType + value valType +} + +// initializes a new LRUCache. +func NewLRUCache(capacity uint) LRUCache { + return LRUCache{ + cap: capacity, + l: new(list.List), + m: make(hashType, capacity), + } +} + +// Get a list node from the hash map. +func (c *LRUCache) Get(key keyType) (valType, bool) { + // check if list node exists + if node, ok := c.m[key]; ok { + val := node.Value.(*list.Element).Value.(Pair).value + // move node to front + c.l.MoveToFront(node) + return val, true + } + return valType{}, false +} + +// Put key and value in the LRUCache +func (c *LRUCache) Put(key keyType, value valType) { + // check if list node exists + if node, ok := c.m[key]; ok { + // move the node to front + c.l.MoveToFront(node) + // update the value of a list node + node.Value.(*list.Element).Value = Pair{key: key, value: value} + } else { + // delete the last list node if the list is full + if uint(c.l.Len()) == c.cap { + // get the key that we want to delete + idx := c.l.Back().Value.(*list.Element).Value.(Pair).key + // delete the node pointer in the hash map by key + delete(c.m, idx) + // remove the last list node + c.l.Remove(c.l.Back()) + } + // initialize a list node + node := &list.Element{ + Value: Pair{ + key: key, + value: value, + }, + } + // push the new list node into the list + ptr := c.l.PushFront(node) + // save the node pointer in the hash map + c.m[key] = ptr + } +} diff --git a/plugins/processors/ifname/cache_test.go b/plugins/processors/ifname/cache_test.go new file mode 100644 index 000000000..986c2d494 --- /dev/null +++ b/plugins/processors/ifname/cache_test.go @@ -0,0 +1,23 @@ +package ifname + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCache(t *testing.T) { + c := NewLRUCache(2) + + c.Put("ones", nameMap{1: "one"}) + twoMap := nameMap{2: "two"} + c.Put("twos", twoMap) + c.Put("threes", nameMap{3: "three"}) + + _, ok := c.Get("ones") + require.False(t, ok) + + v, ok := c.Get("twos") + require.True(t, ok) + require.Equal(t, twoMap, v) +} diff --git a/plugins/processors/ifname/ifname.go b/plugins/processors/ifname/ifname.go new file mode 100644 index 000000000..be57fb14c --- /dev/null +++ b/plugins/processors/ifname/ifname.go @@ -0,0 +1,370 @@ +package ifname + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/snmp" + si "github.com/influxdata/telegraf/plugins/inputs/snmp" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel" +) + +var sampleConfig = ` + ## Name of tag holding the interface number + # tag = "ifIndex" + + ## Name of output tag where service name will be added + # dest = "ifName" + + ## Name of tag of the SNMP agent to request the interface name from + # agent = "agent" + + ## Timeout for each request. + # timeout = "5s" + + ## SNMP version; can be 1, 2, or 3. + # version = 2 + + ## SNMP community string. + # community = "public" + + ## Number of retries to attempt. + # retries = 3 + + ## The GETBULK max-repetitions parameter. + # max_repetitions = 10 + + ## SNMPv3 authentication and encryption options. + ## + ## Security Name. + # sec_name = "myuser" + ## Authentication protocol; one of "MD5", "SHA", or "". + # auth_protocol = "MD5" + ## Authentication password. + # auth_password = "pass" + ## Security Level; one of "noAuthNoPriv", "authNoPriv", or "authPriv". + # sec_level = "authNoPriv" + ## Context Name. + # context_name = "" + ## Privacy protocol used for encrypted messages; one of "DES", "AES" or "". + # priv_protocol = "" + ## Privacy password used for encrypted messages. + # priv_password = "" + + ## max_parallel_lookups is the maximum number of SNMP requests to + ## make at the same time. + # max_parallel_lookups = 100 + + ## ordered controls whether or not the metrics need to stay in the + ## same order this plugin received them in. If false, this plugin + ## may change the order when data is cached. If you need metrics to + ## stay in order set this to true. keeping the metrics ordered may + ## be slightly slower + # ordered = false +` + +type nameMap map[uint64]string +type mapFunc func(agent string) (nameMap, error) + +type makeTableFunc func(string) (*si.Table, error) + +type sigMap map[string](chan struct{}) + +type IfName struct { + SourceTag string `toml:"tag"` + DestTag string `toml:"dest"` + AgentTag string `toml:"agent"` + + snmp.ClientConfig + + CacheSize uint `toml:"max_cache_entries"` + MaxParallelLookups int `toml:"max_parallel_lookups"` + Ordered bool `toml:"ordered"` + + Log telegraf.Logger `toml:"-"` + + ifTable *si.Table `toml:"-"` + ifXTable *si.Table `toml:"-"` + + rwLock sync.RWMutex `toml:"-"` + cache *LRUCache `toml:"-"` + + parallel parallel.Parallel `toml:"-"` + acc telegraf.Accumulator `toml:"-"` + + getMapRemote mapFunc `toml:"-"` + makeTable makeTableFunc `toml:"-"` + + gsBase snmp.GosnmpWrapper `toml:"-"` + + sigs sigMap `toml:"-"` + sigsLock sync.Mutex `toml:"-"` +} + +func (d *IfName) SampleConfig() string { + return sampleConfig +} + +func (d *IfName) Description() string { + return "Add a tag of the network interface name looked up over SNMP by interface number" +} + +func (d *IfName) Init() error { + d.getMapRemote = d.getMapRemoteNoMock + d.makeTable = makeTableNoMock + + c := NewLRUCache(d.CacheSize) + d.cache = &c + + d.sigs = make(sigMap) + + return nil +} + +func (d *IfName) addTag(metric telegraf.Metric) error { + agent, ok := metric.GetTag(d.AgentTag) + if !ok { + //agent tag missing + return nil + } + + num_s, ok := metric.GetTag(d.SourceTag) + if !ok { + //source tag missing + return nil + } + + num, err := strconv.ParseUint(num_s, 10, 64) + if err != nil { + return fmt.Errorf("couldn't parse source tag as uint") + } + + m, err := d.getMap(agent) + if err != nil { + return fmt.Errorf("couldn't retrieve the table of interface names: %w", err) + } + + name, found := m[num] + if !found { + return fmt.Errorf("interface number %d isn't in the table of interface names", num) + } + metric.AddTag(d.DestTag, name) + return nil +} + +func (d *IfName) Start(acc telegraf.Accumulator) error { + d.acc = acc + + var err error + d.gsBase, err = snmp.NewWrapper(d.ClientConfig) + if err != nil { + return fmt.Errorf("parsing SNMP client config: %w", err) + } + + d.ifTable, err = d.makeTable("IF-MIB::ifTable") + if err != nil { + return fmt.Errorf("looking up ifTable in local MIB: %w", err) + } + d.ifXTable, err = d.makeTable("IF-MIB::ifXTable") + if err != nil { + return fmt.Errorf("looking up ifXTable in local MIB: %w", err) + } + + fn := func(m telegraf.Metric) []telegraf.Metric { + err := d.addTag(m) + if err != nil { + d.Log.Debugf("Error adding tag %v", err) + } + return []telegraf.Metric{m} + } + + if d.Ordered { + d.parallel = parallel.NewOrdered(acc, fn, 10000, d.MaxParallelLookups) + } else { + d.parallel = parallel.NewUnordered(acc, fn, d.MaxParallelLookups) + } + return nil +} + +func (d *IfName) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { + d.parallel.Enqueue(metric) + return nil +} + +func (d *IfName) Stop() error { + d.parallel.Stop() + return nil +} + +// getMap gets the interface names map either from cache or from the SNMP +// agent +func (d *IfName) getMap(agent string) (nameMap, error) { + var sig chan struct{} + + // Check cache + d.rwLock.RLock() + m, ok := d.cache.Get(agent) + d.rwLock.RUnlock() + if ok { + return m, nil + } + + // Is this the first request for this agent? + d.sigsLock.Lock() + sig, found := d.sigs[agent] + if !found { + s := make(chan struct{}) + d.sigs[agent] = s + sig = s + } + d.sigsLock.Unlock() + + if found { + // This is not the first request. Wait for first to finish. + <-sig + // Check cache again + d.rwLock.RLock() + m, ok := d.cache.Get(agent) + d.rwLock.RUnlock() + if ok { + return m, nil + } else { + return nil, fmt.Errorf("getting remote table from cache") + } + } + + // The cache missed and this is the first request for this + // agent. + + // Make the SNMP request + m, err := d.getMapRemote(agent) + if err != nil { + //failure. signal without saving to cache + d.sigsLock.Lock() + close(sig) + delete(d.sigs, agent) + d.sigsLock.Unlock() + + return nil, fmt.Errorf("getting remote table: %w", err) + } + + // Cache it + d.rwLock.Lock() + d.cache.Put(agent, m) + d.rwLock.Unlock() + + // Signal any other waiting requests for this agent and clean up + d.sigsLock.Lock() + close(sig) + delete(d.sigs, agent) + d.sigsLock.Unlock() + + return m, nil +} + +func (d *IfName) getMapRemoteNoMock(agent string) (nameMap, error) { + gs := d.gsBase + err := gs.SetAgent(agent) + if err != nil { + return nil, fmt.Errorf("parsing agent tag: %w", err) + } + + err = gs.Connect() + if err != nil { + return nil, fmt.Errorf("connecting when fetching interface names: %w", err) + } + + //try ifXtable and ifName first. if that fails, fall back to + //ifTable and ifDescr + var m nameMap + m, err = buildMap(gs, d.ifXTable, "ifName") + if err == nil { + return m, nil + } + + m, err = buildMap(gs, d.ifTable, "ifDescr") + if err == nil { + return m, nil + } + + return nil, fmt.Errorf("fetching interface names: %w", err) +} + +func init() { + processors.AddStreaming("ifname", func() telegraf.StreamingProcessor { + return &IfName{ + SourceTag: "ifIndex", + DestTag: "ifName", + AgentTag: "agent", + CacheSize: 100, + MaxParallelLookups: 100, + ClientConfig: snmp.ClientConfig{ + Retries: 3, + MaxRepetitions: 10, + Timeout: internal.Duration{Duration: 5 * time.Second}, + Version: 2, + Community: "public", + }, + } + }) +} + +func makeTableNoMock(tableName string) (*si.Table, error) { + var err error + tab := si.Table{ + Oid: tableName, + IndexAsTag: true, + } + + err = tab.Init() + if err != nil { + //Init already wraps + return nil, err + } + + return &tab, nil +} + +func buildMap(gs snmp.GosnmpWrapper, tab *si.Table, column string) (nameMap, error) { + var err error + + rtab, err := tab.Build(gs, true) + if err != nil { + //Build already wraps + return nil, err + } + + if len(rtab.Rows) == 0 { + return nil, fmt.Errorf("empty table") + } + + t := make(nameMap) + for _, v := range rtab.Rows { + i_str, ok := v.Tags["index"] + if !ok { + //should always have an index tag because the table should + //always have IndexAsTag true + return nil, fmt.Errorf("no index tag") + } + i, err := strconv.ParseUint(i_str, 10, 64) + if err != nil { + return nil, fmt.Errorf("index tag isn't a uint") + } + name_if, ok := v.Fields[column] + if !ok { + return nil, fmt.Errorf("field %s is missing", column) + } + name, ok := name_if.(string) + if !ok { + return nil, fmt.Errorf("field %s isn't a string", column) + } + + t[i] = name + } + return t, nil +} diff --git a/plugins/processors/ifname/ifname_test.go b/plugins/processors/ifname/ifname_test.go new file mode 100644 index 000000000..99354e7b1 --- /dev/null +++ b/plugins/processors/ifname/ifname_test.go @@ -0,0 +1,155 @@ +package ifname + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/snmp" + si "github.com/influxdata/telegraf/plugins/inputs/snmp" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestTable(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + d := IfName{} + d.Init() + tab, err := d.makeTable("IF-MIB::ifTable") + require.NoError(t, err) + + config := snmp.ClientConfig{ + Version: 2, + Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout + } + gs, err := snmp.NewWrapper(config) + require.NoError(t, err) + err = gs.SetAgent("127.0.0.1") + require.NoError(t, err) + + err = gs.Connect() + require.NoError(t, err) + + //could use ifIndex but oid index is always the same + m, err := buildMap(gs, tab, "ifDescr") + require.NoError(t, err) + require.NotEmpty(t, m) +} + +func TestIfName(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + d := IfName{ + SourceTag: "ifIndex", + DestTag: "ifName", + AgentTag: "agent", + CacheSize: 1000, + ClientConfig: snmp.ClientConfig{ + Version: 2, + Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout + }, + } + err := d.Init() + require.NoError(t, err) + + acc := testutil.Accumulator{} + err = d.Start(&acc) + + require.NoError(t, err) + + m := testutil.MustMetric( + "cpu", + map[string]string{ + "ifIndex": "1", + "agent": "127.0.0.1", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ) + + expected := testutil.MustMetric( + "cpu", + map[string]string{ + "ifIndex": "1", + "agent": "127.0.0.1", + "ifName": "lo", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ) + + err = d.addTag(m) + require.NoError(t, err) + + testutil.RequireMetricEqual(t, expected, m) +} + +func TestGetMap(t *testing.T) { + d := IfName{ + SourceTag: "ifIndex", + DestTag: "ifName", + AgentTag: "agent", + CacheSize: 1000, + ClientConfig: snmp.ClientConfig{ + Version: 2, + Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout + }, + } + + // This test mocks the snmp transaction so don't run net-snmp + // commands to look up table names. + d.makeTable = func(agent string) (*si.Table, error) { + return &si.Table{}, nil + } + err := d.Init() + require.NoError(t, err) + + // Request the same agent multiple times in goroutines. The first + // request should make the mocked remote call and the others + // should block until the response is cached, then return the + // cached response. + + expected := nameMap{ + 1: "ifname1", + 2: "ifname2", + } + + var wgRemote sync.WaitGroup + var remoteCalls int32 + + wgRemote.Add(1) + d.getMapRemote = func(agent string) (nameMap, error) { + atomic.AddInt32(&remoteCalls, 1) + wgRemote.Wait() //don't return until all requests are made + return expected, nil + } + + const thMax = 3 + var wgReq sync.WaitGroup + + for th := 0; th < thMax; th++ { + wgReq.Add(1) + go func() { + defer wgReq.Done() + m, err := d.getMap("agent") + require.NoError(t, err) + require.Equal(t, expected, m) + }() + } + + //signal mocked remote call to finish + wgRemote.Done() + + //wait for requests to finish + wgReq.Wait() + + //remote call should only happen once + require.Equal(t, int32(1), remoteCalls) + +}