diff --git a/go.mod b/go.mod index e9c8d73c8..2927f8b82 100644 --- a/go.mod +++ b/go.mod @@ -131,7 +131,7 @@ require ( golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/net v0.0.0-20200301022130-244492dfa37a golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d - golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a // indirect + golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4 diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index ad8d8616c..307fe540b 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -16,6 +16,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/rename" + _ "github.com/influxdata/telegraf/plugins/processors/reverse_dns" _ "github.com/influxdata/telegraf/plugins/processors/s2geo" _ "github.com/influxdata/telegraf/plugins/processors/starlark" _ "github.com/influxdata/telegraf/plugins/processors/strings" diff --git a/plugins/processors/reverse_dns/README.md b/plugins/processors/reverse_dns/README.md new file mode 100644 index 000000000..b95174ad2 --- /dev/null +++ b/plugins/processors/reverse_dns/README.md @@ -0,0 +1,72 @@ +# Reverse DNS Processor Plugin + +The `reverse_dns` processor does a reverse-dns lookup on tags (or fields) with +IPs in them. + +### Configuration: + +```toml +[[processors.reverse_dns]] + ## For optimal performance, you may want to limit which metrics are passed to this + ## processor. eg: + ## namepass = ["my_metric_*"] + + ## cache_ttl is how long the dns entries should stay cached for. + ## generally longer is better, but if you expect a large number of diverse lookups + ## you'll want to consider memory use. + cache_ttl = "24h" + + ## lookup_timeout is how long should you wait for a single dns request to repsond. + ## this is also the maximum acceptable latency for a metric travelling through + ## the reverse_dns processor. After lookup_timeout is exceeded, a metric will + ## be passed on unaltered. + ## multiple simultaneous resolution requests for the same IP will only make a + ## single rDNS request, and they will all wait for the answer for this long. + lookup_timeout = "3s" + + ## max_parallel_lookups is the maximum number of dns requests to be in flight + ## at the same time. Requesting hitting cached values do not count against this + ## total, and neither do mulptiple requests for the same IP. + ## It's probably best to keep this number fairly low. + max_parallel_lookups = 10 + + ## ordered controls whether or not the metrics need to stay in the same order + ## this plugin received them in. If false, this plugin will change the order + ## with requests hitting cached results moving through immediately and not + ## waiting on slower lookups. This may cause issues for you if you are + ## depending on the order of metrics staying the same. If so, set this to true. + ## keeping the metrics ordered may be slightly slower. + ordered = false + + [[processors.reverse_dns.lookup]] + ## get the ip from the field "source_ip", and put the result in the field "source_name" + field = "source_ip" + dest = "source_name" + + [[processors.reverse_dns.lookup]] + ## get the ip from the tag "destination_ip", and put the result in the tag + ## "destination_name". + tag = "destination_ip" + dest = "destination_name" + + ## If you would prefer destination_name to be a field instead, you can use a + ## processors.converter after this one, specifying the order attribute. +``` + + + +### Example processing: + +example config: + +```toml +[[processors.reverse_dns]] + [[processors.reverse_dns.lookup]] + tag = "ip" + dest = "domain" +``` + +```diff +- ping,ip=8.8.8.8 elapsed=300i 1502489900000000000 ++ ping,ip=8.8.8.8,domain=dns.google. elapsed=300i 1502489900000000000 +``` diff --git a/plugins/processors/reverse_dns/parallel/ordered.go b/plugins/processors/reverse_dns/parallel/ordered.go new file mode 100644 index 000000000..763df2db6 --- /dev/null +++ b/plugins/processors/reverse_dns/parallel/ordered.go @@ -0,0 +1,89 @@ +package parallel + +import ( + "sync" + + "github.com/influxdata/telegraf" +) + +type Ordered struct { + wg sync.WaitGroup + fn func(telegraf.Metric) []telegraf.Metric + + // queue of jobs coming in. Workers pick jobs off this queue for processing + workerQueue chan job + + // queue of ordered metrics going out + queue chan futureMetric +} + +func NewOrdered( + acc telegraf.Accumulator, + fn func(telegraf.Metric) []telegraf.Metric, + orderedQueueSize int, + workerCount int, +) *Ordered { + p := &Ordered{ + fn: fn, + workerQueue: make(chan job, workerCount), + queue: make(chan futureMetric, orderedQueueSize), + } + p.startWorkers(workerCount) + p.wg.Add(1) + go func() { + p.readQueue(acc) + p.wg.Done() + }() + return p +} + +func (p *Ordered) Enqueue(metric telegraf.Metric) { + future := make(futureMetric) + p.queue <- future + + // write the future to the worker pool. Order doesn't matter now because the + // outgoing p.queue will enforce order regardless of the order the jobs are + // completed in + p.workerQueue <- job{ + future: future, + metric: metric, + } +} + +func (p *Ordered) readQueue(acc telegraf.Accumulator) { + // wait for the response from each worker in order + for mCh := range p.queue { + // allow each worker to write out multiple metrics + for metrics := range mCh { + for _, m := range metrics { + acc.AddMetric(m) + } + } + } +} + +func (p *Ordered) startWorkers(count int) { + p.wg.Add(count) + for i := 0; i < count; i++ { + go func() { + for job := range p.workerQueue { + job.future <- p.fn(job.metric) + close(job.future) + } + p.wg.Done() + }() + } +} + +func (p *Ordered) Stop() { + close(p.queue) + close(p.workerQueue) + p.wg.Wait() +} + +type futureMetric chan []telegraf.Metric + +type job struct { + future futureMetric + metric telegraf.Metric +} diff --git a/plugins/processors/reverse_dns/parallel/parallel.go b/plugins/processors/reverse_dns/parallel/parallel.go new file mode 100644 index 000000000..f3ad04c72 --- /dev/null +++ b/plugins/processors/reverse_dns/parallel/parallel.go @@ -0,0 +1,8 @@ +package parallel + +import "github.com/influxdata/telegraf" + +type Parallel interface { + Enqueue(telegraf.Metric) + Stop() +} diff --git a/plugins/processors/reverse_dns/parallel/parallel_test.go b/plugins/processors/reverse_dns/parallel/parallel_test.go new file mode 100644 index 000000000..0d2839a24 --- /dev/null +++ b/plugins/processors/reverse_dns/parallel/parallel_test.go @@ -0,0 +1,119 @@ +package parallel_test + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestOrderedJobsStayOrdered(t *testing.T) { + acc := &testutil.Accumulator{} + + p := parallel.NewOrdered(acc, jobFunc, 10000, 10) + now := time.Now() + for i := 0; i < 20000; i++ { + m, err := metric.New("test", + map[string]string{}, + map[string]interface{}{ + "val": i, + }, + now, + ) + require.NoError(t, err) + now = now.Add(1) + p.Enqueue(m) + } + p.Stop() + + i := 0 + require.Len(t, acc.Metrics, 20000, fmt.Sprintf("expected 20k metrics but got %d", len(acc.GetTelegrafMetrics()))) + for _, m := range acc.GetTelegrafMetrics() { + v, ok := m.GetField("val") + require.True(t, ok) + require.EqualValues(t, i, v) + i++ + } +} + +func TestUnorderedJobsDontDropAnyJobs(t *testing.T) { + acc := &testutil.Accumulator{} + + p := parallel.NewUnordered(acc, jobFunc, 10) + + now := time.Now() + + expectedTotal := 0 + for i := 0; i < 20000; i++ { + expectedTotal += i + m, err := metric.New("test", + map[string]string{}, + map[string]interface{}{ + "val": i, + }, + now, + ) + require.NoError(t, err) + now = now.Add(1) + p.Enqueue(m) + } + p.Stop() + + actualTotal := int64(0) + require.Len(t, acc.Metrics, 20000, fmt.Sprintf("expected 20k metrics but got %d", len(acc.GetTelegrafMetrics()))) + for _, m := range acc.GetTelegrafMetrics() { + v, ok := m.GetField("val") + require.True(t, ok) + actualTotal += v.(int64) + } + require.EqualValues(t, expectedTotal, actualTotal) +} + +func BenchmarkOrdered(b *testing.B) { + acc := &testutil.Accumulator{} + + p := parallel.NewOrdered(acc, jobFunc, 10000, 10) + + m, _ := metric.New("test", + map[string]string{}, + map[string]interface{}{ + "val": 1, + }, + time.Now(), + ) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + p.Enqueue(m) + } + p.Stop() +} + +func BenchmarkUnordered(b *testing.B) { + acc := &testutil.Accumulator{} + + p := parallel.NewUnordered(acc, jobFunc, 10) + + m, _ := metric.New("test", + map[string]string{}, + map[string]interface{}{ + "val": 1, + }, + time.Now(), + ) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + p.Enqueue(m) + } + p.Stop() +} + +func jobFunc(m telegraf.Metric) []telegraf.Metric { + return []telegraf.Metric{m} +} diff --git a/plugins/processors/reverse_dns/parallel/unordered.go b/plugins/processors/reverse_dns/parallel/unordered.go new file mode 100644 index 000000000..eef6e1f7c --- /dev/null +++ b/plugins/processors/reverse_dns/parallel/unordered.go @@ -0,0 +1,60 @@ +package parallel + +import ( + "sync" + + "github.com/influxdata/telegraf" +) + +type Unordered struct { + wg sync.WaitGroup + acc telegraf.Accumulator + fn func(telegraf.Metric) []telegraf.Metric + inQueue chan telegraf.Metric +} + +func NewUnordered( + acc telegraf.Accumulator, + fn func(telegraf.Metric) []telegraf.Metric, + workerCount int, +) *Unordered { + p := &Unordered{ + acc: acc, + inQueue: make(chan telegraf.Metric, workerCount), + fn: fn, + } + + // start workers + p.wg.Add(1) + go func() { + p.startWorkers(workerCount) + p.wg.Done() + }() + + return p +} + +func (p *Unordered) startWorkers(count int) { + wg := sync.WaitGroup{} + wg.Add(count) + for i := 0; i < count; i++ { + go func() { + for metric := range p.inQueue { + for _, m := range p.fn(metric) { + p.acc.AddMetric(m) + } + } + wg.Done() + }() + } + wg.Wait() +} + +func (p *Unordered) Stop() { + close(p.inQueue) + p.wg.Wait() +} + +func (p *Unordered) Enqueue(m telegraf.Metric) { + p.inQueue <- m +} diff --git a/plugins/processors/reverse_dns/rdnscache.go b/plugins/processors/reverse_dns/rdnscache.go new file mode 100644 index 000000000..1d86b5385 --- /dev/null +++ b/plugins/processors/reverse_dns/rdnscache.go @@ -0,0 +1,319 @@ +package reverse_dns + +import ( + "context" + "errors" + "net" + "sync" + "sync/atomic" + "time" + + "golang.org/x/sync/semaphore" +) + +const defaultMaxWorkers = 10 + +var ( + ErrTimeout = errors.New("request timed out") +) + +// AnyResolver is for the net.Resolver +type AnyResolver interface { + LookupAddr(ctx context.Context, addr string) (names []string, err error) +} + +// ReverseDNSCache is safe to use across multiple goroutines. +// if multiple goroutines request the same IP at the same time, one of the +// requests will trigger the lookup and the rest will wait for its response. +type ReverseDNSCache struct { + Resolver AnyResolver + stats RDNSCacheStats + + // settings + ttl time.Duration + lookupTimeout time.Duration + maxWorkers int + + // internal + rwLock sync.RWMutex + sem *semaphore.Weighted + cancelCleanupWorker context.CancelFunc + + cache map[string]*dnslookup + + // keep an ordered list of what needs to be worked on and what is due to expire. + // We can use this list for both with a job position marker, and by popping items + // off the list as they expire. This avoids iterating over the whole map to find + // things to do. + // As a bonus, we only have to read the first item to know if anything in the + // map has expired. + // must lock to get access to this. + expireList []*dnslookup + expireListLock sync.Mutex +} + +type RDNSCacheStats struct { + CacheHit uint64 + CacheMiss uint64 + CacheExpire uint64 + RequestsAbandoned uint64 + RequestsFilled uint64 +} + +func NewReverseDNSCache(ttl, lookupTimeout time.Duration, workerPoolSize int) *ReverseDNSCache { + if workerPoolSize <= 0 { + workerPoolSize = defaultMaxWorkers + } + ctx, cancel := context.WithCancel(context.Background()) + d := &ReverseDNSCache{ + ttl: ttl, + lookupTimeout: lookupTimeout, + cache: map[string]*dnslookup{}, + expireList: []*dnslookup{}, + maxWorkers: workerPoolSize, + sem: semaphore.NewWeighted(int64(workerPoolSize)), + cancelCleanupWorker: cancel, + Resolver: net.DefaultResolver, + } + d.startCleanupWorker(ctx) + return d +} + +// dnslookup represents a lookup request/response. It may or may not be answered yet. +// interested parties register themselves with existing requests or create new ones +// to get their dns query answered. Answers will be pushed out to callbacks. +type dnslookup struct { + ip string // keep a copy for the expireList. + domains []string + expiresAt time.Time + completed bool + callbacks []callbackChannelType +} + +type lookupResult struct { + domains []string + err error +} + +type callbackChannelType chan lookupResult + +// Lookup takes a string representing a parseable ipv4 or ipv6 IP, and blocks +// until it has resolved to 0-n results, or until its lookup timeout has elapsed. +// if the lookup timeout elapses, it returns an empty slice. +func (d *ReverseDNSCache) Lookup(ip string) ([]string, error) { + if len(ip) == 0 { + return nil, nil + } + return d.lookup(ip) +} + +func (d *ReverseDNSCache) lookup(ip string) ([]string, error) { + // check if the value is cached + d.rwLock.RLock() + result, found := d.lockedGetFromCache(ip) + if found && result.completed && result.expiresAt.After(time.Now()) { + defer d.rwLock.RUnlock() + atomic.AddUint64(&d.stats.CacheHit, 1) + // cache is valid + return result.domains, nil + } + d.rwLock.RUnlock() + + // if it's not cached, kick off a lookup job and subscribe to the result. + lookupChan := d.subscribeTo(ip) + timer := time.NewTimer(d.lookupTimeout) + defer timer.Stop() + + // timer is still necessary even if doLookup respects timeout due to worker + // pool starvation. + select { + case result := <-lookupChan: + return result.domains, result.err + case <-timer.C: + return nil, ErrTimeout + } +} + +func (d *ReverseDNSCache) subscribeTo(ip string) callbackChannelType { + callback := make(callbackChannelType, 1) + + d.rwLock.Lock() + defer d.rwLock.Unlock() + + // confirm it's still not in the cache. This needs to be done under an active lock. + result, found := d.lockedGetFromCache(ip) + if found { + atomic.AddUint64(&d.stats.CacheHit, 1) + // has the request been answered since we last checked? + if result.completed { + // we can return the answer with the channel. + callback <- lookupResult{domains: result.domains} + return callback + } + // there's a request but it hasn't been answered yet; + // add yourself to the subscribers and return that. + result.callbacks = append(result.callbacks, callback) + d.lockedSaveToCache(result) + return callback + } + + atomic.AddUint64(&d.stats.CacheMiss, 1) + + // otherwise we need to register the request + l := &dnslookup{ + ip: ip, + expiresAt: time.Now().Add(d.ttl), + callbacks: []callbackChannelType{callback}, + } + + d.lockedSaveToCache(l) + go d.doLookup(l.ip) + return callback +} + +// lockedGetFromCache fetches from the correct internal ip cache. +// you MUST first do a read or write lock before calling it, and keep locks around +// the dnslookup that is returned until you clone it. +func (d *ReverseDNSCache) lockedGetFromCache(ip string) (lookup *dnslookup, found bool) { + lookup, found = d.cache[ip] + if found && lookup.expiresAt.Before(time.Now()) { + return nil, false + } + return lookup, found +} + +// lockedSaveToCache stores a lookup in the correct internal ip cache. +// you MUST first do a write lock before calling it. +func (d *ReverseDNSCache) lockedSaveToCache(lookup *dnslookup) { + if lookup.expiresAt.Before(time.Now()) { + return // don't cache. + } + d.cache[lookup.ip] = lookup +} + +func (d *ReverseDNSCache) startCleanupWorker(ctx context.Context) { + go func() { + cleanupTick := time.NewTicker(10 * time.Second) + for { + select { + case <-cleanupTick.C: + d.cleanup() + case <-ctx.Done(): + return + } + } + }() +} + +func (d *ReverseDNSCache) doLookup(ip string) { + ctx, cancel := context.WithTimeout(context.Background(), d.lookupTimeout) + defer cancel() + if err := d.sem.Acquire(ctx, 1); err != nil { + // lookup timeout + d.abandonLookup(ip, ErrTimeout) + return + } + defer d.sem.Release(1) + + names, err := d.Resolver.LookupAddr(ctx, ip) + if err != nil { + d.abandonLookup(ip, err) + return + } + + d.rwLock.Lock() + lookup, found := d.lockedGetFromCache(ip) + if !found { + d.rwLock.Unlock() + return + } + + lookup.domains = names + lookup.completed = true + lookup.expiresAt = time.Now().Add(d.ttl) // extend the ttl now that we have a reply. + callbacks := lookup.callbacks + lookup.callbacks = nil + + d.lockedSaveToCache(lookup) + d.rwLock.Unlock() + + d.expireListLock.Lock() + // add it to the expireList. + d.expireList = append(d.expireList, lookup) + d.expireListLock.Unlock() + + atomic.AddUint64(&d.stats.RequestsFilled, uint64(len(callbacks))) + for _, cb := range callbacks { + cb <- lookupResult{domains: names} + close(cb) + } +} + +func (d *ReverseDNSCache) abandonLookup(ip string, err error) { + d.rwLock.Lock() + lookup, found := d.lockedGetFromCache(ip) + if !found { + d.rwLock.Unlock() + return + } + + callbacks := lookup.callbacks + delete(d.cache, lookup.ip) + d.rwLock.Unlock() + // resolve the remaining callbacks to free the resources. + atomic.AddUint64(&d.stats.RequestsAbandoned, uint64(len(callbacks))) + for _, cb := range callbacks { + cb <- lookupResult{err: err} + close(cb) + } +} + +func (d *ReverseDNSCache) cleanup() { + now := time.Now() + d.expireListLock.Lock() + if len(d.expireList) == 0 { + d.expireListLock.Unlock() + return + } + ipsToDelete := []string{} + for i := 0; i < len(d.expireList); i++ { + if d.expireList[i].expiresAt.After(now) { + break // done. Nothing after this point is expired. + } + ipsToDelete = append(ipsToDelete, d.expireList[i].ip) + } + if len(ipsToDelete) == 0 { + d.expireListLock.Unlock() + return + } + d.expireList = d.expireList[len(ipsToDelete):] + d.expireListLock.Unlock() + + atomic.AddUint64(&d.stats.CacheExpire, uint64(len(ipsToDelete))) + + d.rwLock.Lock() + defer d.rwLock.Unlock() + for _, ip := range ipsToDelete { + delete(d.cache, ip) + } +} + +// blockAllWorkers is a test function that eats up all the worker pool space to +// make sure workers are done running and there's no room to acquire a new worker. +func (d *ReverseDNSCache) blockAllWorkers() { + d.sem.Acquire(context.Background(), int64(d.maxWorkers)) +} + +func (d *ReverseDNSCache) Stats() RDNSCacheStats { + stats := RDNSCacheStats{} + stats.CacheHit = atomic.LoadUint64(&d.stats.CacheHit) + stats.CacheMiss = atomic.LoadUint64(&d.stats.CacheMiss) + stats.CacheExpire = atomic.LoadUint64(&d.stats.CacheExpire) + stats.RequestsAbandoned = atomic.LoadUint64(&d.stats.RequestsAbandoned) + stats.RequestsFilled = atomic.LoadUint64(&d.stats.RequestsFilled) + return stats +} + +func (d *ReverseDNSCache) Stop() { + d.cancelCleanupWorker() +} diff --git a/plugins/processors/reverse_dns/rdnscache_test.go b/plugins/processors/reverse_dns/rdnscache_test.go new file mode 100644 index 000000000..e8466c27f --- /dev/null +++ b/plugins/processors/reverse_dns/rdnscache_test.go @@ -0,0 +1,136 @@ +package reverse_dns + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSimpleReverseDNSLookup(t *testing.T) { + d := NewReverseDNSCache(60*time.Second, 1*time.Second, -1) + defer d.Stop() + + d.Resolver = &localResolver{} + answer, err := d.Lookup("127.0.0.1") + require.NoError(t, err) + require.Equal(t, []string{"localhost"}, answer) + d.blockAllWorkers() + + // do another request with no workers available. + // it should read from cache instantly. + answer, err = d.Lookup("127.0.0.1") + require.NoError(t, err) + require.Equal(t, []string{"localhost"}, answer) + + require.Len(t, d.cache, 1) + require.Len(t, d.expireList, 1) + d.cleanup() + require.Len(t, d.expireList, 1) // ttl hasn't hit yet. + + stats := d.Stats() + + require.EqualValues(t, 0, stats.CacheExpire) + require.EqualValues(t, 1, stats.CacheMiss) + require.EqualValues(t, 1, stats.CacheHit) + require.EqualValues(t, 1, stats.RequestsFilled) + require.EqualValues(t, 0, stats.RequestsAbandoned) +} + +func TestParallelReverseDNSLookup(t *testing.T) { + d := NewReverseDNSCache(1*time.Second, 1*time.Second, -1) + defer d.Stop() + + d.Resolver = &localResolver{} + var answer1 []string + var answer2 []string + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + answer, err := d.Lookup("127.0.0.1") + require.NoError(t, err) + answer1 = answer + wg.Done() + }() + go func() { + answer, err := d.Lookup("127.0.0.1") + require.NoError(t, err) + answer2 = answer + wg.Done() + }() + + wg.Wait() + + t.Log(answer1) + t.Log(answer2) + + require.Equal(t, []string{"localhost"}, answer1) + require.Equal(t, []string{"localhost"}, answer2) + + require.Len(t, d.cache, 1) + + stats := d.Stats() + + require.EqualValues(t, 1, stats.CacheMiss) + require.EqualValues(t, 1, stats.CacheHit) +} + +func TestUnavailableDNSServerRespectsTimeout(t *testing.T) { + d := NewReverseDNSCache(0, 1, -1) + defer d.Stop() + + d.Resolver = &timeoutResolver{} + + result, err := d.Lookup("192.153.33.3") + require.Error(t, err) + require.Equal(t, ErrTimeout, err) + + require.Nil(t, result) +} + +func TestCleanupHappens(t *testing.T) { + ttl := 100 * time.Millisecond + d := NewReverseDNSCache(ttl, 1*time.Second, -1) + defer d.Stop() + + d.Resolver = &localResolver{} + _, err := d.Lookup("127.0.0.1") + require.NoError(t, err) + + require.Len(t, d.cache, 1) + + time.Sleep(ttl) // wait for cache entry to expire. + d.cleanup() + require.Len(t, d.expireList, 0) + + stats := d.Stats() + + require.EqualValues(t, 1, stats.CacheExpire) + require.EqualValues(t, 1, stats.CacheMiss) + require.EqualValues(t, 0, stats.CacheHit) +} + +func TestLookupTimeout(t *testing.T) { + d := NewReverseDNSCache(10*time.Second, 10*time.Second, -1) + defer d.Stop() + + d.Resolver = &timeoutResolver{} + _, err := d.Lookup("127.0.0.1") + require.Error(t, err) + require.EqualValues(t, 1, d.Stats().RequestsAbandoned) +} + +type timeoutResolver struct{} + +func (r *timeoutResolver) LookupAddr(ctx context.Context, addr string) (names []string, err error) { + return nil, errors.New("timeout") +} + +type localResolver struct{} + +func (r *localResolver) LookupAddr(ctx context.Context, addr string) (names []string, err error) { + return []string{"localhost"}, nil +} diff --git a/plugins/processors/reverse_dns/reversedns.go b/plugins/processors/reverse_dns/reversedns.go new file mode 100644 index 000000000..bef79a01c --- /dev/null +++ b/plugins/processors/reverse_dns/reversedns.go @@ -0,0 +1,156 @@ +package reverse_dns + +import ( + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel" +) + +const sampleConfig = ` + ## For optimal performance, you may want to limit which metrics are passed to this + ## processor. eg: + ## namepass = ["my_metric_*"] + + ## cache_ttl is how long the dns entries should stay cached for. + ## generally longer is better, but if you expect a large number of diverse lookups + ## you'll want to consider memory use. + cache_ttl = "24h" + + ## lookup_timeout is how long should you wait for a single dns request to repsond. + ## this is also the maximum acceptable latency for a metric travelling through + ## the reverse_dns processor. After lookup_timeout is exceeded, a metric will + ## be passed on unaltered. + ## multiple simultaneous resolution requests for the same IP will only make a + ## single rDNS request, and they will all wait for the answer for this long. + lookup_timeout = "3s" + + ## max_parallel_lookups is the maximum number of dns requests to be in flight + ## at the same time. Requesting hitting cached values do not count against this + ## total, and neither do mulptiple requests for the same IP. + ## It's probably best to keep this number fairly low. + max_parallel_lookups = 10 + + ## ordered controls whether or not the metrics need to stay in the same order + ## this plugin received them in. If false, this plugin will change the order + ## with requests hitting cached results moving through immediately and not + ## waiting on slower lookups. This may cause issues for you if you are + ## depending on the order of metrics staying the same. If so, set this to true. + ## keeping the metrics ordered may be slightly slower. + ordered = false + + [[processors.reverse_dns.lookup]] + ## get the ip from the field "source_ip", and put the result in the field "source_name" + field = "source_ip" + dest = "source_name" + + [[processors.reverse_dns.lookup]] + ## get the ip from the tag "destination_ip", and put the result in the tag + ## "destination_name". + tag = "destination_ip" + dest = "destination_name" + + ## If you would prefer destination_name to be a field instead, you can use a + ## processors.converter after this one, specifying the order attribute. +` + +type lookupEntry struct { + Tag string `toml:"tag"` + Field string `toml:"field"` + Dest string `toml:"dest"` +} + +type ReverseDNS struct { + reverseDNSCache *ReverseDNSCache + acc telegraf.Accumulator + parallel parallel.Parallel + + Lookups []lookupEntry `toml:"lookup"` + CacheTTL config.Duration `toml:"cache_ttl"` + LookupTimeout config.Duration `toml:"lookup_timeout"` + MaxParallelLookups int `toml:"max_parallel_lookups"` + Ordered bool `toml:"ordered"` + Log telegraf.Logger `toml:"-"` +} + +func (r *ReverseDNS) SampleConfig() string { + return sampleConfig +} + +func (r *ReverseDNS) Description() string { + return "ReverseDNS does a reverse lookup on IP addresses to retrieve the DNS name" +} + +func (r *ReverseDNS) Start(acc telegraf.Accumulator) error { + r.acc = acc + r.reverseDNSCache = NewReverseDNSCache( + time.Duration(r.CacheTTL), + time.Duration(r.LookupTimeout), + r.MaxParallelLookups, // max parallel reverse-dns lookups + ) + if r.Ordered { + r.parallel = parallel.NewOrdered(acc, r.asyncAdd, 10000, r.MaxParallelLookups) + } else { + r.parallel = parallel.NewUnordered(acc, r.asyncAdd, r.MaxParallelLookups) + } + return nil +} + +func (r *ReverseDNS) Stop() error { + r.parallel.Stop() + r.reverseDNSCache.Stop() + return nil +} + +func (r *ReverseDNS) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { + r.parallel.Enqueue(metric) + return nil +} + +func (r *ReverseDNS) asyncAdd(metric telegraf.Metric) []telegraf.Metric { + for _, lookup := range r.Lookups { + if len(lookup.Field) > 0 { + if ipField, ok := metric.GetField(lookup.Field); ok { + if ip, ok := ipField.(string); ok { + result, err := r.reverseDNSCache.Lookup(ip) + if err != nil { + r.Log.Errorf("lookup error: %v", err) + continue + } + if len(result) > 0 { + metric.AddField(lookup.Dest, result[0]) + } + } + } + } + if len(lookup.Tag) > 0 { + if ipTag, ok := metric.GetTag(lookup.Tag); ok { + result, err := r.reverseDNSCache.Lookup(ipTag) + if err != nil { + r.Log.Errorf("lookup error: %v", err) + continue + } + if len(result) > 0 { + metric.AddTag(lookup.Dest, result[0]) + } + } + } + } + return []telegraf.Metric{metric} +} + +func init() { + processors.AddStreaming("reverse_dns", func() telegraf.StreamingProcessor { + return newReverseDNS() + }) +} + +func newReverseDNS() *ReverseDNS { + return &ReverseDNS{ + CacheTTL: config.Duration(24 * time.Hour), + LookupTimeout: config.Duration(1 * time.Minute), + MaxParallelLookups: 10, + } +} diff --git a/plugins/processors/reverse_dns/reversedns_test.go b/plugins/processors/reverse_dns/reversedns_test.go new file mode 100644 index 000000000..d4b78ce8e --- /dev/null +++ b/plugins/processors/reverse_dns/reversedns_test.go @@ -0,0 +1,55 @@ +package reverse_dns + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestSimpleReverseLookup(t *testing.T) { + now := time.Now() + m, _ := metric.New("name", map[string]string{ + "dest_ip": "8.8.8.8", + }, map[string]interface{}{ + "source_ip": "127.0.0.1", + }, now) + + dns := newReverseDNS() + dns.Lookups = []lookupEntry{ + { + Field: "source_ip", + Dest: "source_name", + }, + { + Tag: "dest_ip", + Dest: "dest_name", + }, + } + acc := &testutil.Accumulator{} + dns.Start(acc) + dns.Add(m, acc) + dns.Stop() + // should be processed now. + + require.Len(t, acc.GetTelegrafMetrics(), 1) + processedMetric := acc.GetTelegrafMetrics()[0] + f, ok := processedMetric.GetField("source_name") + require.True(t, ok) + require.EqualValues(t, "localhost", f) + + tag, ok := processedMetric.GetTag("dest_name") + require.True(t, ok) + require.EqualValues(t, "dns.google.", tag) +} + +func TestLoadingConfig(t *testing.T) { + c := config.NewConfig() + err := c.LoadConfigData([]byte("[[processors.reverse_dns]]\n" + sampleConfig)) + require.NoError(t, err) + + require.Len(t, c.Processors, 1) +}