ifname processor: expire old cached entries (#7838)
This commit is contained in:
parent
9a5fd6588c
commit
9b58590df3
|
|
@ -59,6 +59,11 @@ Telegraf minimum version: Telegraf 1.15.0
|
|||
## stay in order set this to true. keeping the metrics ordered may
|
||||
## be slightly slower
|
||||
# ordered = false
|
||||
|
||||
## cache_ttl is the amount of time interface names are cached for a
|
||||
## given agent. After this period elapses if names are needed they
|
||||
## will be retrieved again.
|
||||
# cache_ttl = "8h"
|
||||
```
|
||||
|
||||
### Example processing:
|
||||
|
|
|
|||
|
|
@ -6,9 +6,7 @@ import (
|
|||
"container/list"
|
||||
)
|
||||
|
||||
// Type aliases let the implementation be more generic
|
||||
type keyType = string
|
||||
type valType = nameMap
|
||||
type LRUValType = TTLValType
|
||||
|
||||
type hashType map[keyType]*list.Element
|
||||
|
||||
|
|
@ -21,7 +19,7 @@ type LRUCache struct {
|
|||
// Pair is the value of a list node.
|
||||
type Pair struct {
|
||||
key keyType
|
||||
value valType
|
||||
value LRUValType
|
||||
}
|
||||
|
||||
// initializes a new LRUCache.
|
||||
|
|
@ -34,7 +32,7 @@ func NewLRUCache(capacity uint) LRUCache {
|
|||
}
|
||||
|
||||
// Get a list node from the hash map.
|
||||
func (c *LRUCache) Get(key keyType) (valType, bool) {
|
||||
func (c *LRUCache) Get(key keyType) (LRUValType, bool) {
|
||||
// check if list node exists
|
||||
if node, ok := c.m[key]; ok {
|
||||
val := node.Value.(*list.Element).Value.(Pair).value
|
||||
|
|
@ -42,11 +40,11 @@ func (c *LRUCache) Get(key keyType) (valType, bool) {
|
|||
c.l.MoveToFront(node)
|
||||
return val, true
|
||||
}
|
||||
return valType{}, false
|
||||
return LRUValType{}, false
|
||||
}
|
||||
|
||||
// Put key and value in the LRUCache
|
||||
func (c *LRUCache) Put(key keyType, value valType) {
|
||||
func (c *LRUCache) Put(key keyType, value LRUValType) {
|
||||
// check if list node exists
|
||||
if node, ok := c.m[key]; ok {
|
||||
// move the node to front
|
||||
|
|
@ -76,3 +74,10 @@ func (c *LRUCache) Put(key keyType, value valType) {
|
|||
c.m[key] = ptr
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LRUCache) Delete(key keyType) {
|
||||
if node, ok := c.m[key]; ok {
|
||||
c.l.Remove(node)
|
||||
delete(c.m, key)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,10 +9,10 @@ import (
|
|||
func TestCache(t *testing.T) {
|
||||
c := NewLRUCache(2)
|
||||
|
||||
c.Put("ones", nameMap{1: "one"})
|
||||
twoMap := nameMap{2: "two"}
|
||||
c.Put("ones", LRUValType{val: nameMap{1: "one"}})
|
||||
twoMap := LRUValType{val: nameMap{2: "two"}}
|
||||
c.Put("twos", twoMap)
|
||||
c.Put("threes", nameMap{3: "three"})
|
||||
c.Put("threes", LRUValType{val: nameMap{3: "three"}})
|
||||
|
||||
_, ok := c.Get("ones")
|
||||
require.False(t, ok)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/snmp"
|
||||
si "github.com/influxdata/telegraf/plugins/inputs/snmp"
|
||||
|
|
@ -66,11 +67,18 @@ var sampleConfig = `
|
|||
## stay in order set this to true. keeping the metrics ordered may
|
||||
## be slightly slower
|
||||
# ordered = false
|
||||
|
||||
## cache_ttl is the amount of time interface names are cached for a
|
||||
## given agent. After this period elapses if names are needed they
|
||||
## will be retrieved again.
|
||||
# cache_ttl = "8h"
|
||||
`
|
||||
|
||||
type nameMap map[uint64]string
|
||||
type mapFunc func(agent string) (nameMap, error)
|
||||
type keyType = string
|
||||
type valType = nameMap
|
||||
|
||||
type mapFunc func(agent string) (nameMap, error)
|
||||
type makeTableFunc func(string) (*si.Table, error)
|
||||
|
||||
type sigMap map[string](chan struct{})
|
||||
|
|
@ -82,9 +90,10 @@ type IfName struct {
|
|||
|
||||
snmp.ClientConfig
|
||||
|
||||
CacheSize uint `toml:"max_cache_entries"`
|
||||
MaxParallelLookups int `toml:"max_parallel_lookups"`
|
||||
Ordered bool `toml:"ordered"`
|
||||
CacheSize uint `toml:"max_cache_entries"`
|
||||
MaxParallelLookups int `toml:"max_parallel_lookups"`
|
||||
Ordered bool `toml:"ordered"`
|
||||
CacheTTL config.Duration `toml:"cache_ttl"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
|
|
@ -92,7 +101,7 @@ type IfName struct {
|
|||
ifXTable *si.Table `toml:"-"`
|
||||
|
||||
rwLock sync.RWMutex `toml:"-"`
|
||||
cache *LRUCache `toml:"-"`
|
||||
cache *TTLCache `toml:"-"`
|
||||
|
||||
parallel parallel.Parallel `toml:"-"`
|
||||
acc telegraf.Accumulator `toml:"-"`
|
||||
|
|
@ -106,6 +115,8 @@ type IfName struct {
|
|||
sigsLock sync.Mutex `toml:"-"`
|
||||
}
|
||||
|
||||
const minRetry time.Duration = 5 * time.Minute
|
||||
|
||||
func (d *IfName) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
|
@ -118,7 +129,7 @@ func (d *IfName) Init() error {
|
|||
d.getMapRemote = d.getMapRemoteNoMock
|
||||
d.makeTable = makeTableNoMock
|
||||
|
||||
c := NewLRUCache(d.CacheSize)
|
||||
c := NewTTLCache(time.Duration(d.CacheTTL), d.CacheSize)
|
||||
d.cache = &c
|
||||
|
||||
d.sigs = make(sigMap)
|
||||
|
|
@ -144,17 +155,42 @@ func (d *IfName) addTag(metric telegraf.Metric) error {
|
|||
return fmt.Errorf("couldn't parse source tag as uint")
|
||||
}
|
||||
|
||||
m, err := d.getMap(agent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't retrieve the table of interface names: %w", err)
|
||||
}
|
||||
firstTime := true
|
||||
for {
|
||||
m, age, err := d.getMap(agent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't retrieve the table of interface names: %w", err)
|
||||
}
|
||||
|
||||
name, found := m[num]
|
||||
if !found {
|
||||
return fmt.Errorf("interface number %d isn't in the table of interface names", num)
|
||||
name, found := m[num]
|
||||
if found {
|
||||
// success
|
||||
metric.AddTag(d.DestTag, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// We have the agent's interface map but it doesn't contain
|
||||
// the interface we're interested in. If the entry is old
|
||||
// enough, retrieve it from the agent once more.
|
||||
if age < minRetry {
|
||||
return fmt.Errorf("interface number %d isn't in the table of interface names", num)
|
||||
}
|
||||
|
||||
if firstTime {
|
||||
d.invalidate(agent)
|
||||
firstTime = false
|
||||
continue
|
||||
}
|
||||
|
||||
// not found, cache hit, retrying
|
||||
return fmt.Errorf("missing interface but couldn't retrieve table")
|
||||
}
|
||||
metric.AddTag(d.DestTag, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *IfName) invalidate(agent string) {
|
||||
d.rwLock.RLock()
|
||||
d.cache.Delete(agent)
|
||||
d.rwLock.RUnlock()
|
||||
}
|
||||
|
||||
func (d *IfName) Start(acc telegraf.Accumulator) error {
|
||||
|
|
@ -203,15 +239,15 @@ func (d *IfName) Stop() error {
|
|||
|
||||
// getMap gets the interface names map either from cache or from the SNMP
|
||||
// agent
|
||||
func (d *IfName) getMap(agent string) (nameMap, error) {
|
||||
func (d *IfName) getMap(agent string) (entry nameMap, age time.Duration, err error) {
|
||||
var sig chan struct{}
|
||||
|
||||
// Check cache
|
||||
d.rwLock.RLock()
|
||||
m, ok := d.cache.Get(agent)
|
||||
m, ok, age := d.cache.Get(agent)
|
||||
d.rwLock.RUnlock()
|
||||
if ok {
|
||||
return m, nil
|
||||
return m, age, nil
|
||||
}
|
||||
|
||||
// Is this the first request for this agent?
|
||||
|
|
@ -229,12 +265,12 @@ func (d *IfName) getMap(agent string) (nameMap, error) {
|
|||
<-sig
|
||||
// Check cache again
|
||||
d.rwLock.RLock()
|
||||
m, ok := d.cache.Get(agent)
|
||||
m, ok, age := d.cache.Get(agent)
|
||||
d.rwLock.RUnlock()
|
||||
if ok {
|
||||
return m, nil
|
||||
return m, age, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("getting remote table from cache")
|
||||
return nil, 0, fmt.Errorf("getting remote table from cache")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -242,7 +278,7 @@ func (d *IfName) getMap(agent string) (nameMap, error) {
|
|||
// agent.
|
||||
|
||||
// Make the SNMP request
|
||||
m, err := d.getMapRemote(agent)
|
||||
m, err = d.getMapRemote(agent)
|
||||
if err != nil {
|
||||
//failure. signal without saving to cache
|
||||
d.sigsLock.Lock()
|
||||
|
|
@ -250,7 +286,7 @@ func (d *IfName) getMap(agent string) (nameMap, error) {
|
|||
delete(d.sigs, agent)
|
||||
d.sigsLock.Unlock()
|
||||
|
||||
return nil, fmt.Errorf("getting remote table: %w", err)
|
||||
return nil, 0, fmt.Errorf("getting remote table: %w", err)
|
||||
}
|
||||
|
||||
// Cache it
|
||||
|
|
@ -264,7 +300,7 @@ func (d *IfName) getMap(agent string) (nameMap, error) {
|
|||
delete(d.sigs, agent)
|
||||
d.sigsLock.Unlock()
|
||||
|
||||
return m, nil
|
||||
return m, 0, nil
|
||||
}
|
||||
|
||||
func (d *IfName) getMapRemoteNoMock(agent string) (nameMap, error) {
|
||||
|
|
@ -310,6 +346,7 @@ func init() {
|
|||
Version: 2,
|
||||
Community: "public",
|
||||
},
|
||||
CacheTTL: config.Duration(8 * time.Hour),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/snmp"
|
||||
si "github.com/influxdata/telegraf/plugins/inputs/snmp"
|
||||
|
|
@ -100,6 +101,7 @@ func TestGetMap(t *testing.T) {
|
|||
Version: 2,
|
||||
Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout
|
||||
},
|
||||
CacheTTL: config.Duration(10 * time.Second),
|
||||
}
|
||||
|
||||
// This test mocks the snmp transaction so don't run net-snmp
|
||||
|
|
@ -137,7 +139,7 @@ func TestGetMap(t *testing.T) {
|
|||
wgReq.Add(1)
|
||||
go func() {
|
||||
defer wgReq.Done()
|
||||
m, err := d.getMap("agent")
|
||||
m, _, err := d.getMap("agent")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, m)
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
package ifname
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type TTLValType struct {
|
||||
time time.Time // when entry was added
|
||||
val valType
|
||||
}
|
||||
|
||||
type timeFunc func() time.Time
|
||||
|
||||
type TTLCache struct {
|
||||
validDuration time.Duration
|
||||
lru LRUCache
|
||||
now timeFunc
|
||||
}
|
||||
|
||||
func NewTTLCache(valid time.Duration, capacity uint) TTLCache {
|
||||
return TTLCache{
|
||||
lru: NewLRUCache(capacity),
|
||||
validDuration: valid,
|
||||
now: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TTLCache) Get(key keyType) (valType, bool, time.Duration) {
|
||||
v, ok := c.lru.Get(key)
|
||||
if !ok {
|
||||
return valType{}, false, 0
|
||||
}
|
||||
age := c.now().Sub(v.time)
|
||||
if age < c.validDuration {
|
||||
return v.val, ok, age
|
||||
} else {
|
||||
c.lru.Delete(key)
|
||||
return valType{}, false, 0
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TTLCache) Put(key keyType, value valType) {
|
||||
v := TTLValType{
|
||||
val: value,
|
||||
time: c.now(),
|
||||
}
|
||||
c.lru.Put(key, v)
|
||||
}
|
||||
|
||||
func (c *TTLCache) Delete(key keyType) {
|
||||
c.lru.Delete(key)
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package ifname
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTTLCacheExpire(t *testing.T) {
|
||||
c := NewTTLCache(1*time.Second, 100)
|
||||
|
||||
c.now = func() time.Time {
|
||||
return time.Unix(0, 0)
|
||||
}
|
||||
|
||||
c.Put("ones", nameMap{1: "one"})
|
||||
require.Len(t, c.lru.m, 1)
|
||||
|
||||
c.now = func() time.Time {
|
||||
return time.Unix(1, 0)
|
||||
}
|
||||
|
||||
_, ok, _ := c.Get("ones")
|
||||
require.False(t, ok)
|
||||
require.Len(t, c.lru.m, 0)
|
||||
require.Equal(t, c.lru.l.Len(), 0)
|
||||
}
|
||||
|
||||
func TestTTLCache(t *testing.T) {
|
||||
c := NewTTLCache(1*time.Second, 100)
|
||||
|
||||
c.now = func() time.Time {
|
||||
return time.Unix(0, 0)
|
||||
}
|
||||
|
||||
expected := nameMap{1: "one"}
|
||||
c.Put("ones", expected)
|
||||
|
||||
actual, ok, _ := c.Get("ones")
|
||||
require.True(t, ok)
|
||||
require.Equal(t, expected, actual)
|
||||
}
|
||||
Loading…
Reference in New Issue