reverse dns lookup processor (#7639)
This commit is contained in:
parent
d75ca67e47
commit
9190f2e659
2
go.mod
2
go.mod
|
|
@ -131,7 +131,7 @@ require (
|
||||||
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
|
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
|
||||||
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
|
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
|
||||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
|
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
|
||||||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a // indirect
|
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
|
||||||
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4
|
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4
|
||||||
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect
|
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect
|
||||||
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4
|
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
_ "github.com/influxdata/telegraf/plugins/processors/printer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
_ "github.com/influxdata/telegraf/plugins/processors/regex"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/rename"
|
_ "github.com/influxdata/telegraf/plugins/processors/rename"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/processors/reverse_dns"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/s2geo"
|
_ "github.com/influxdata/telegraf/plugins/processors/s2geo"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/starlark"
|
_ "github.com/influxdata/telegraf/plugins/processors/starlark"
|
||||||
_ "github.com/influxdata/telegraf/plugins/processors/strings"
|
_ "github.com/influxdata/telegraf/plugins/processors/strings"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,72 @@
|
||||||
|
# Reverse DNS Processor Plugin
|
||||||
|
|
||||||
|
The `reverse_dns` processor does a reverse-dns lookup on tags (or fields) with
|
||||||
|
IPs in them.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.reverse_dns]]
|
||||||
|
## For optimal performance, you may want to limit which metrics are passed to this
|
||||||
|
## processor. eg:
|
||||||
|
## namepass = ["my_metric_*"]
|
||||||
|
|
||||||
|
## cache_ttl is how long the dns entries should stay cached for.
|
||||||
|
## generally longer is better, but if you expect a large number of diverse lookups
|
||||||
|
## you'll want to consider memory use.
|
||||||
|
cache_ttl = "24h"
|
||||||
|
|
||||||
|
## lookup_timeout is how long should you wait for a single dns request to repsond.
|
||||||
|
## this is also the maximum acceptable latency for a metric travelling through
|
||||||
|
## the reverse_dns processor. After lookup_timeout is exceeded, a metric will
|
||||||
|
## be passed on unaltered.
|
||||||
|
## multiple simultaneous resolution requests for the same IP will only make a
|
||||||
|
## single rDNS request, and they will all wait for the answer for this long.
|
||||||
|
lookup_timeout = "3s"
|
||||||
|
|
||||||
|
## max_parallel_lookups is the maximum number of dns requests to be in flight
|
||||||
|
## at the same time. Requesting hitting cached values do not count against this
|
||||||
|
## total, and neither do mulptiple requests for the same IP.
|
||||||
|
## It's probably best to keep this number fairly low.
|
||||||
|
max_parallel_lookups = 10
|
||||||
|
|
||||||
|
## ordered controls whether or not the metrics need to stay in the same order
|
||||||
|
## this plugin received them in. If false, this plugin will change the order
|
||||||
|
## with requests hitting cached results moving through immediately and not
|
||||||
|
## waiting on slower lookups. This may cause issues for you if you are
|
||||||
|
## depending on the order of metrics staying the same. If so, set this to true.
|
||||||
|
## keeping the metrics ordered may be slightly slower.
|
||||||
|
ordered = false
|
||||||
|
|
||||||
|
[[processors.reverse_dns.lookup]]
|
||||||
|
## get the ip from the field "source_ip", and put the result in the field "source_name"
|
||||||
|
field = "source_ip"
|
||||||
|
dest = "source_name"
|
||||||
|
|
||||||
|
[[processors.reverse_dns.lookup]]
|
||||||
|
## get the ip from the tag "destination_ip", and put the result in the tag
|
||||||
|
## "destination_name".
|
||||||
|
tag = "destination_ip"
|
||||||
|
dest = "destination_name"
|
||||||
|
|
||||||
|
## If you would prefer destination_name to be a field instead, you can use a
|
||||||
|
## processors.converter after this one, specifying the order attribute.
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### Example processing:
|
||||||
|
|
||||||
|
example config:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.reverse_dns]]
|
||||||
|
[[processors.reverse_dns.lookup]]
|
||||||
|
tag = "ip"
|
||||||
|
dest = "domain"
|
||||||
|
```
|
||||||
|
|
||||||
|
```diff
|
||||||
|
- ping,ip=8.8.8.8 elapsed=300i 1502489900000000000
|
||||||
|
+ ping,ip=8.8.8.8,domain=dns.google. elapsed=300i 1502489900000000000
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,89 @@
|
||||||
|
package parallel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Ordered struct {
|
||||||
|
wg sync.WaitGroup
|
||||||
|
fn func(telegraf.Metric) []telegraf.Metric
|
||||||
|
|
||||||
|
// queue of jobs coming in. Workers pick jobs off this queue for processing
|
||||||
|
workerQueue chan job
|
||||||
|
|
||||||
|
// queue of ordered metrics going out
|
||||||
|
queue chan futureMetric
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOrdered(
|
||||||
|
acc telegraf.Accumulator,
|
||||||
|
fn func(telegraf.Metric) []telegraf.Metric,
|
||||||
|
orderedQueueSize int,
|
||||||
|
workerCount int,
|
||||||
|
) *Ordered {
|
||||||
|
p := &Ordered{
|
||||||
|
fn: fn,
|
||||||
|
workerQueue: make(chan job, workerCount),
|
||||||
|
queue: make(chan futureMetric, orderedQueueSize),
|
||||||
|
}
|
||||||
|
p.startWorkers(workerCount)
|
||||||
|
p.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
p.readQueue(acc)
|
||||||
|
p.wg.Done()
|
||||||
|
}()
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Ordered) Enqueue(metric telegraf.Metric) {
|
||||||
|
future := make(futureMetric)
|
||||||
|
p.queue <- future
|
||||||
|
|
||||||
|
// write the future to the worker pool. Order doesn't matter now because the
|
||||||
|
// outgoing p.queue will enforce order regardless of the order the jobs are
|
||||||
|
// completed in
|
||||||
|
p.workerQueue <- job{
|
||||||
|
future: future,
|
||||||
|
metric: metric,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Ordered) readQueue(acc telegraf.Accumulator) {
|
||||||
|
// wait for the response from each worker in order
|
||||||
|
for mCh := range p.queue {
|
||||||
|
// allow each worker to write out multiple metrics
|
||||||
|
for metrics := range mCh {
|
||||||
|
for _, m := range metrics {
|
||||||
|
acc.AddMetric(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Ordered) startWorkers(count int) {
|
||||||
|
p.wg.Add(count)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
go func() {
|
||||||
|
for job := range p.workerQueue {
|
||||||
|
job.future <- p.fn(job.metric)
|
||||||
|
close(job.future)
|
||||||
|
}
|
||||||
|
p.wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Ordered) Stop() {
|
||||||
|
close(p.queue)
|
||||||
|
close(p.workerQueue)
|
||||||
|
p.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
type futureMetric chan []telegraf.Metric
|
||||||
|
|
||||||
|
type job struct {
|
||||||
|
future futureMetric
|
||||||
|
metric telegraf.Metric
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
package parallel
|
||||||
|
|
||||||
|
import "github.com/influxdata/telegraf"
|
||||||
|
|
||||||
|
type Parallel interface {
|
||||||
|
Enqueue(telegraf.Metric)
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,119 @@
|
||||||
|
package parallel_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOrderedJobsStayOrdered(t *testing.T) {
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
p := parallel.NewOrdered(acc, jobFunc, 10000, 10)
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < 20000; i++ {
|
||||||
|
m, err := metric.New("test",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"val": i,
|
||||||
|
},
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
now = now.Add(1)
|
||||||
|
p.Enqueue(m)
|
||||||
|
}
|
||||||
|
p.Stop()
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
require.Len(t, acc.Metrics, 20000, fmt.Sprintf("expected 20k metrics but got %d", len(acc.GetTelegrafMetrics())))
|
||||||
|
for _, m := range acc.GetTelegrafMetrics() {
|
||||||
|
v, ok := m.GetField("val")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.EqualValues(t, i, v)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnorderedJobsDontDropAnyJobs(t *testing.T) {
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
p := parallel.NewUnordered(acc, jobFunc, 10)
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
expectedTotal := 0
|
||||||
|
for i := 0; i < 20000; i++ {
|
||||||
|
expectedTotal += i
|
||||||
|
m, err := metric.New("test",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"val": i,
|
||||||
|
},
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
now = now.Add(1)
|
||||||
|
p.Enqueue(m)
|
||||||
|
}
|
||||||
|
p.Stop()
|
||||||
|
|
||||||
|
actualTotal := int64(0)
|
||||||
|
require.Len(t, acc.Metrics, 20000, fmt.Sprintf("expected 20k metrics but got %d", len(acc.GetTelegrafMetrics())))
|
||||||
|
for _, m := range acc.GetTelegrafMetrics() {
|
||||||
|
v, ok := m.GetField("val")
|
||||||
|
require.True(t, ok)
|
||||||
|
actualTotal += v.(int64)
|
||||||
|
}
|
||||||
|
require.EqualValues(t, expectedTotal, actualTotal)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkOrdered(b *testing.B) {
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
p := parallel.NewOrdered(acc, jobFunc, 10000, 10)
|
||||||
|
|
||||||
|
m, _ := metric.New("test",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"val": 1,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
p.Enqueue(m)
|
||||||
|
}
|
||||||
|
p.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkUnordered(b *testing.B) {
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
p := parallel.NewUnordered(acc, jobFunc, 10)
|
||||||
|
|
||||||
|
m, _ := metric.New("test",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"val": 1,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
p.Enqueue(m)
|
||||||
|
}
|
||||||
|
p.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func jobFunc(m telegraf.Metric) []telegraf.Metric {
|
||||||
|
return []telegraf.Metric{m}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,60 @@
|
||||||
|
package parallel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Unordered struct {
|
||||||
|
wg sync.WaitGroup
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
fn func(telegraf.Metric) []telegraf.Metric
|
||||||
|
inQueue chan telegraf.Metric
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUnordered(
|
||||||
|
acc telegraf.Accumulator,
|
||||||
|
fn func(telegraf.Metric) []telegraf.Metric,
|
||||||
|
workerCount int,
|
||||||
|
) *Unordered {
|
||||||
|
p := &Unordered{
|
||||||
|
acc: acc,
|
||||||
|
inQueue: make(chan telegraf.Metric, workerCount),
|
||||||
|
fn: fn,
|
||||||
|
}
|
||||||
|
|
||||||
|
// start workers
|
||||||
|
p.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
p.startWorkers(workerCount)
|
||||||
|
p.wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Unordered) startWorkers(count int) {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(count)
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
go func() {
|
||||||
|
for metric := range p.inQueue {
|
||||||
|
for _, m := range p.fn(metric) {
|
||||||
|
p.acc.AddMetric(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Unordered) Stop() {
|
||||||
|
close(p.inQueue)
|
||||||
|
p.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Unordered) Enqueue(m telegraf.Metric) {
|
||||||
|
p.inQueue <- m
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,319 @@
|
||||||
|
package reverse_dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
)
|
||||||
|
|
||||||
|
const defaultMaxWorkers = 10
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrTimeout = errors.New("request timed out")
|
||||||
|
)
|
||||||
|
|
||||||
|
// AnyResolver is for the net.Resolver
|
||||||
|
type AnyResolver interface {
|
||||||
|
LookupAddr(ctx context.Context, addr string) (names []string, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReverseDNSCache is safe to use across multiple goroutines.
|
||||||
|
// if multiple goroutines request the same IP at the same time, one of the
|
||||||
|
// requests will trigger the lookup and the rest will wait for its response.
|
||||||
|
type ReverseDNSCache struct {
|
||||||
|
Resolver AnyResolver
|
||||||
|
stats RDNSCacheStats
|
||||||
|
|
||||||
|
// settings
|
||||||
|
ttl time.Duration
|
||||||
|
lookupTimeout time.Duration
|
||||||
|
maxWorkers int
|
||||||
|
|
||||||
|
// internal
|
||||||
|
rwLock sync.RWMutex
|
||||||
|
sem *semaphore.Weighted
|
||||||
|
cancelCleanupWorker context.CancelFunc
|
||||||
|
|
||||||
|
cache map[string]*dnslookup
|
||||||
|
|
||||||
|
// keep an ordered list of what needs to be worked on and what is due to expire.
|
||||||
|
// We can use this list for both with a job position marker, and by popping items
|
||||||
|
// off the list as they expire. This avoids iterating over the whole map to find
|
||||||
|
// things to do.
|
||||||
|
// As a bonus, we only have to read the first item to know if anything in the
|
||||||
|
// map has expired.
|
||||||
|
// must lock to get access to this.
|
||||||
|
expireList []*dnslookup
|
||||||
|
expireListLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type RDNSCacheStats struct {
|
||||||
|
CacheHit uint64
|
||||||
|
CacheMiss uint64
|
||||||
|
CacheExpire uint64
|
||||||
|
RequestsAbandoned uint64
|
||||||
|
RequestsFilled uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReverseDNSCache(ttl, lookupTimeout time.Duration, workerPoolSize int) *ReverseDNSCache {
|
||||||
|
if workerPoolSize <= 0 {
|
||||||
|
workerPoolSize = defaultMaxWorkers
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
d := &ReverseDNSCache{
|
||||||
|
ttl: ttl,
|
||||||
|
lookupTimeout: lookupTimeout,
|
||||||
|
cache: map[string]*dnslookup{},
|
||||||
|
expireList: []*dnslookup{},
|
||||||
|
maxWorkers: workerPoolSize,
|
||||||
|
sem: semaphore.NewWeighted(int64(workerPoolSize)),
|
||||||
|
cancelCleanupWorker: cancel,
|
||||||
|
Resolver: net.DefaultResolver,
|
||||||
|
}
|
||||||
|
d.startCleanupWorker(ctx)
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
// dnslookup represents a lookup request/response. It may or may not be answered yet.
|
||||||
|
// interested parties register themselves with existing requests or create new ones
|
||||||
|
// to get their dns query answered. Answers will be pushed out to callbacks.
|
||||||
|
type dnslookup struct {
|
||||||
|
ip string // keep a copy for the expireList.
|
||||||
|
domains []string
|
||||||
|
expiresAt time.Time
|
||||||
|
completed bool
|
||||||
|
callbacks []callbackChannelType
|
||||||
|
}
|
||||||
|
|
||||||
|
type lookupResult struct {
|
||||||
|
domains []string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type callbackChannelType chan lookupResult
|
||||||
|
|
||||||
|
// Lookup takes a string representing a parseable ipv4 or ipv6 IP, and blocks
|
||||||
|
// until it has resolved to 0-n results, or until its lookup timeout has elapsed.
|
||||||
|
// if the lookup timeout elapses, it returns an empty slice.
|
||||||
|
func (d *ReverseDNSCache) Lookup(ip string) ([]string, error) {
|
||||||
|
if len(ip) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return d.lookup(ip)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) lookup(ip string) ([]string, error) {
|
||||||
|
// check if the value is cached
|
||||||
|
d.rwLock.RLock()
|
||||||
|
result, found := d.lockedGetFromCache(ip)
|
||||||
|
if found && result.completed && result.expiresAt.After(time.Now()) {
|
||||||
|
defer d.rwLock.RUnlock()
|
||||||
|
atomic.AddUint64(&d.stats.CacheHit, 1)
|
||||||
|
// cache is valid
|
||||||
|
return result.domains, nil
|
||||||
|
}
|
||||||
|
d.rwLock.RUnlock()
|
||||||
|
|
||||||
|
// if it's not cached, kick off a lookup job and subscribe to the result.
|
||||||
|
lookupChan := d.subscribeTo(ip)
|
||||||
|
timer := time.NewTimer(d.lookupTimeout)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
// timer is still necessary even if doLookup respects timeout due to worker
|
||||||
|
// pool starvation.
|
||||||
|
select {
|
||||||
|
case result := <-lookupChan:
|
||||||
|
return result.domains, result.err
|
||||||
|
case <-timer.C:
|
||||||
|
return nil, ErrTimeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) subscribeTo(ip string) callbackChannelType {
|
||||||
|
callback := make(callbackChannelType, 1)
|
||||||
|
|
||||||
|
d.rwLock.Lock()
|
||||||
|
defer d.rwLock.Unlock()
|
||||||
|
|
||||||
|
// confirm it's still not in the cache. This needs to be done under an active lock.
|
||||||
|
result, found := d.lockedGetFromCache(ip)
|
||||||
|
if found {
|
||||||
|
atomic.AddUint64(&d.stats.CacheHit, 1)
|
||||||
|
// has the request been answered since we last checked?
|
||||||
|
if result.completed {
|
||||||
|
// we can return the answer with the channel.
|
||||||
|
callback <- lookupResult{domains: result.domains}
|
||||||
|
return callback
|
||||||
|
}
|
||||||
|
// there's a request but it hasn't been answered yet;
|
||||||
|
// add yourself to the subscribers and return that.
|
||||||
|
result.callbacks = append(result.callbacks, callback)
|
||||||
|
d.lockedSaveToCache(result)
|
||||||
|
return callback
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddUint64(&d.stats.CacheMiss, 1)
|
||||||
|
|
||||||
|
// otherwise we need to register the request
|
||||||
|
l := &dnslookup{
|
||||||
|
ip: ip,
|
||||||
|
expiresAt: time.Now().Add(d.ttl),
|
||||||
|
callbacks: []callbackChannelType{callback},
|
||||||
|
}
|
||||||
|
|
||||||
|
d.lockedSaveToCache(l)
|
||||||
|
go d.doLookup(l.ip)
|
||||||
|
return callback
|
||||||
|
}
|
||||||
|
|
||||||
|
// lockedGetFromCache fetches from the correct internal ip cache.
|
||||||
|
// you MUST first do a read or write lock before calling it, and keep locks around
|
||||||
|
// the dnslookup that is returned until you clone it.
|
||||||
|
func (d *ReverseDNSCache) lockedGetFromCache(ip string) (lookup *dnslookup, found bool) {
|
||||||
|
lookup, found = d.cache[ip]
|
||||||
|
if found && lookup.expiresAt.Before(time.Now()) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return lookup, found
|
||||||
|
}
|
||||||
|
|
||||||
|
// lockedSaveToCache stores a lookup in the correct internal ip cache.
|
||||||
|
// you MUST first do a write lock before calling it.
|
||||||
|
func (d *ReverseDNSCache) lockedSaveToCache(lookup *dnslookup) {
|
||||||
|
if lookup.expiresAt.Before(time.Now()) {
|
||||||
|
return // don't cache.
|
||||||
|
}
|
||||||
|
d.cache[lookup.ip] = lookup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) startCleanupWorker(ctx context.Context) {
|
||||||
|
go func() {
|
||||||
|
cleanupTick := time.NewTicker(10 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-cleanupTick.C:
|
||||||
|
d.cleanup()
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) doLookup(ip string) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), d.lookupTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if err := d.sem.Acquire(ctx, 1); err != nil {
|
||||||
|
// lookup timeout
|
||||||
|
d.abandonLookup(ip, ErrTimeout)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer d.sem.Release(1)
|
||||||
|
|
||||||
|
names, err := d.Resolver.LookupAddr(ctx, ip)
|
||||||
|
if err != nil {
|
||||||
|
d.abandonLookup(ip, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
d.rwLock.Lock()
|
||||||
|
lookup, found := d.lockedGetFromCache(ip)
|
||||||
|
if !found {
|
||||||
|
d.rwLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lookup.domains = names
|
||||||
|
lookup.completed = true
|
||||||
|
lookup.expiresAt = time.Now().Add(d.ttl) // extend the ttl now that we have a reply.
|
||||||
|
callbacks := lookup.callbacks
|
||||||
|
lookup.callbacks = nil
|
||||||
|
|
||||||
|
d.lockedSaveToCache(lookup)
|
||||||
|
d.rwLock.Unlock()
|
||||||
|
|
||||||
|
d.expireListLock.Lock()
|
||||||
|
// add it to the expireList.
|
||||||
|
d.expireList = append(d.expireList, lookup)
|
||||||
|
d.expireListLock.Unlock()
|
||||||
|
|
||||||
|
atomic.AddUint64(&d.stats.RequestsFilled, uint64(len(callbacks)))
|
||||||
|
for _, cb := range callbacks {
|
||||||
|
cb <- lookupResult{domains: names}
|
||||||
|
close(cb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) abandonLookup(ip string, err error) {
|
||||||
|
d.rwLock.Lock()
|
||||||
|
lookup, found := d.lockedGetFromCache(ip)
|
||||||
|
if !found {
|
||||||
|
d.rwLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
callbacks := lookup.callbacks
|
||||||
|
delete(d.cache, lookup.ip)
|
||||||
|
d.rwLock.Unlock()
|
||||||
|
// resolve the remaining callbacks to free the resources.
|
||||||
|
atomic.AddUint64(&d.stats.RequestsAbandoned, uint64(len(callbacks)))
|
||||||
|
for _, cb := range callbacks {
|
||||||
|
cb <- lookupResult{err: err}
|
||||||
|
close(cb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) cleanup() {
|
||||||
|
now := time.Now()
|
||||||
|
d.expireListLock.Lock()
|
||||||
|
if len(d.expireList) == 0 {
|
||||||
|
d.expireListLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ipsToDelete := []string{}
|
||||||
|
for i := 0; i < len(d.expireList); i++ {
|
||||||
|
if d.expireList[i].expiresAt.After(now) {
|
||||||
|
break // done. Nothing after this point is expired.
|
||||||
|
}
|
||||||
|
ipsToDelete = append(ipsToDelete, d.expireList[i].ip)
|
||||||
|
}
|
||||||
|
if len(ipsToDelete) == 0 {
|
||||||
|
d.expireListLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.expireList = d.expireList[len(ipsToDelete):]
|
||||||
|
d.expireListLock.Unlock()
|
||||||
|
|
||||||
|
atomic.AddUint64(&d.stats.CacheExpire, uint64(len(ipsToDelete)))
|
||||||
|
|
||||||
|
d.rwLock.Lock()
|
||||||
|
defer d.rwLock.Unlock()
|
||||||
|
for _, ip := range ipsToDelete {
|
||||||
|
delete(d.cache, ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// blockAllWorkers is a test function that eats up all the worker pool space to
|
||||||
|
// make sure workers are done running and there's no room to acquire a new worker.
|
||||||
|
func (d *ReverseDNSCache) blockAllWorkers() {
|
||||||
|
d.sem.Acquire(context.Background(), int64(d.maxWorkers))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) Stats() RDNSCacheStats {
|
||||||
|
stats := RDNSCacheStats{}
|
||||||
|
stats.CacheHit = atomic.LoadUint64(&d.stats.CacheHit)
|
||||||
|
stats.CacheMiss = atomic.LoadUint64(&d.stats.CacheMiss)
|
||||||
|
stats.CacheExpire = atomic.LoadUint64(&d.stats.CacheExpire)
|
||||||
|
stats.RequestsAbandoned = atomic.LoadUint64(&d.stats.RequestsAbandoned)
|
||||||
|
stats.RequestsFilled = atomic.LoadUint64(&d.stats.RequestsFilled)
|
||||||
|
return stats
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReverseDNSCache) Stop() {
|
||||||
|
d.cancelCleanupWorker()
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,136 @@
|
||||||
|
package reverse_dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSimpleReverseDNSLookup(t *testing.T) {
|
||||||
|
d := NewReverseDNSCache(60*time.Second, 1*time.Second, -1)
|
||||||
|
defer d.Stop()
|
||||||
|
|
||||||
|
d.Resolver = &localResolver{}
|
||||||
|
answer, err := d.Lookup("127.0.0.1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, []string{"localhost"}, answer)
|
||||||
|
d.blockAllWorkers()
|
||||||
|
|
||||||
|
// do another request with no workers available.
|
||||||
|
// it should read from cache instantly.
|
||||||
|
answer, err = d.Lookup("127.0.0.1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, []string{"localhost"}, answer)
|
||||||
|
|
||||||
|
require.Len(t, d.cache, 1)
|
||||||
|
require.Len(t, d.expireList, 1)
|
||||||
|
d.cleanup()
|
||||||
|
require.Len(t, d.expireList, 1) // ttl hasn't hit yet.
|
||||||
|
|
||||||
|
stats := d.Stats()
|
||||||
|
|
||||||
|
require.EqualValues(t, 0, stats.CacheExpire)
|
||||||
|
require.EqualValues(t, 1, stats.CacheMiss)
|
||||||
|
require.EqualValues(t, 1, stats.CacheHit)
|
||||||
|
require.EqualValues(t, 1, stats.RequestsFilled)
|
||||||
|
require.EqualValues(t, 0, stats.RequestsAbandoned)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParallelReverseDNSLookup(t *testing.T) {
|
||||||
|
d := NewReverseDNSCache(1*time.Second, 1*time.Second, -1)
|
||||||
|
defer d.Stop()
|
||||||
|
|
||||||
|
d.Resolver = &localResolver{}
|
||||||
|
var answer1 []string
|
||||||
|
var answer2 []string
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(2)
|
||||||
|
go func() {
|
||||||
|
answer, err := d.Lookup("127.0.0.1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
answer1 = answer
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
answer, err := d.Lookup("127.0.0.1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
answer2 = answer
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
t.Log(answer1)
|
||||||
|
t.Log(answer2)
|
||||||
|
|
||||||
|
require.Equal(t, []string{"localhost"}, answer1)
|
||||||
|
require.Equal(t, []string{"localhost"}, answer2)
|
||||||
|
|
||||||
|
require.Len(t, d.cache, 1)
|
||||||
|
|
||||||
|
stats := d.Stats()
|
||||||
|
|
||||||
|
require.EqualValues(t, 1, stats.CacheMiss)
|
||||||
|
require.EqualValues(t, 1, stats.CacheHit)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnavailableDNSServerRespectsTimeout(t *testing.T) {
|
||||||
|
d := NewReverseDNSCache(0, 1, -1)
|
||||||
|
defer d.Stop()
|
||||||
|
|
||||||
|
d.Resolver = &timeoutResolver{}
|
||||||
|
|
||||||
|
result, err := d.Lookup("192.153.33.3")
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, ErrTimeout, err)
|
||||||
|
|
||||||
|
require.Nil(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCleanupHappens(t *testing.T) {
|
||||||
|
ttl := 100 * time.Millisecond
|
||||||
|
d := NewReverseDNSCache(ttl, 1*time.Second, -1)
|
||||||
|
defer d.Stop()
|
||||||
|
|
||||||
|
d.Resolver = &localResolver{}
|
||||||
|
_, err := d.Lookup("127.0.0.1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Len(t, d.cache, 1)
|
||||||
|
|
||||||
|
time.Sleep(ttl) // wait for cache entry to expire.
|
||||||
|
d.cleanup()
|
||||||
|
require.Len(t, d.expireList, 0)
|
||||||
|
|
||||||
|
stats := d.Stats()
|
||||||
|
|
||||||
|
require.EqualValues(t, 1, stats.CacheExpire)
|
||||||
|
require.EqualValues(t, 1, stats.CacheMiss)
|
||||||
|
require.EqualValues(t, 0, stats.CacheHit)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLookupTimeout(t *testing.T) {
|
||||||
|
d := NewReverseDNSCache(10*time.Second, 10*time.Second, -1)
|
||||||
|
defer d.Stop()
|
||||||
|
|
||||||
|
d.Resolver = &timeoutResolver{}
|
||||||
|
_, err := d.Lookup("127.0.0.1")
|
||||||
|
require.Error(t, err)
|
||||||
|
require.EqualValues(t, 1, d.Stats().RequestsAbandoned)
|
||||||
|
}
|
||||||
|
|
||||||
|
type timeoutResolver struct{}
|
||||||
|
|
||||||
|
func (r *timeoutResolver) LookupAddr(ctx context.Context, addr string) (names []string, err error) {
|
||||||
|
return nil, errors.New("timeout")
|
||||||
|
}
|
||||||
|
|
||||||
|
type localResolver struct{}
|
||||||
|
|
||||||
|
func (r *localResolver) LookupAddr(ctx context.Context, addr string) (names []string, err error) {
|
||||||
|
return []string{"localhost"}, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,156 @@
|
||||||
|
package reverse_dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel"
|
||||||
|
)
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## For optimal performance, you may want to limit which metrics are passed to this
|
||||||
|
## processor. eg:
|
||||||
|
## namepass = ["my_metric_*"]
|
||||||
|
|
||||||
|
## cache_ttl is how long the dns entries should stay cached for.
|
||||||
|
## generally longer is better, but if you expect a large number of diverse lookups
|
||||||
|
## you'll want to consider memory use.
|
||||||
|
cache_ttl = "24h"
|
||||||
|
|
||||||
|
## lookup_timeout is how long should you wait for a single dns request to repsond.
|
||||||
|
## this is also the maximum acceptable latency for a metric travelling through
|
||||||
|
## the reverse_dns processor. After lookup_timeout is exceeded, a metric will
|
||||||
|
## be passed on unaltered.
|
||||||
|
## multiple simultaneous resolution requests for the same IP will only make a
|
||||||
|
## single rDNS request, and they will all wait for the answer for this long.
|
||||||
|
lookup_timeout = "3s"
|
||||||
|
|
||||||
|
## max_parallel_lookups is the maximum number of dns requests to be in flight
|
||||||
|
## at the same time. Requesting hitting cached values do not count against this
|
||||||
|
## total, and neither do mulptiple requests for the same IP.
|
||||||
|
## It's probably best to keep this number fairly low.
|
||||||
|
max_parallel_lookups = 10
|
||||||
|
|
||||||
|
## ordered controls whether or not the metrics need to stay in the same order
|
||||||
|
## this plugin received them in. If false, this plugin will change the order
|
||||||
|
## with requests hitting cached results moving through immediately and not
|
||||||
|
## waiting on slower lookups. This may cause issues for you if you are
|
||||||
|
## depending on the order of metrics staying the same. If so, set this to true.
|
||||||
|
## keeping the metrics ordered may be slightly slower.
|
||||||
|
ordered = false
|
||||||
|
|
||||||
|
[[processors.reverse_dns.lookup]]
|
||||||
|
## get the ip from the field "source_ip", and put the result in the field "source_name"
|
||||||
|
field = "source_ip"
|
||||||
|
dest = "source_name"
|
||||||
|
|
||||||
|
[[processors.reverse_dns.lookup]]
|
||||||
|
## get the ip from the tag "destination_ip", and put the result in the tag
|
||||||
|
## "destination_name".
|
||||||
|
tag = "destination_ip"
|
||||||
|
dest = "destination_name"
|
||||||
|
|
||||||
|
## If you would prefer destination_name to be a field instead, you can use a
|
||||||
|
## processors.converter after this one, specifying the order attribute.
|
||||||
|
`
|
||||||
|
|
||||||
|
type lookupEntry struct {
|
||||||
|
Tag string `toml:"tag"`
|
||||||
|
Field string `toml:"field"`
|
||||||
|
Dest string `toml:"dest"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReverseDNS struct {
|
||||||
|
reverseDNSCache *ReverseDNSCache
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
parallel parallel.Parallel
|
||||||
|
|
||||||
|
Lookups []lookupEntry `toml:"lookup"`
|
||||||
|
CacheTTL config.Duration `toml:"cache_ttl"`
|
||||||
|
LookupTimeout config.Duration `toml:"lookup_timeout"`
|
||||||
|
MaxParallelLookups int `toml:"max_parallel_lookups"`
|
||||||
|
Ordered bool `toml:"ordered"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ReverseDNS) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ReverseDNS) Description() string {
|
||||||
|
return "ReverseDNS does a reverse lookup on IP addresses to retrieve the DNS name"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ReverseDNS) Start(acc telegraf.Accumulator) error {
|
||||||
|
r.acc = acc
|
||||||
|
r.reverseDNSCache = NewReverseDNSCache(
|
||||||
|
time.Duration(r.CacheTTL),
|
||||||
|
time.Duration(r.LookupTimeout),
|
||||||
|
r.MaxParallelLookups, // max parallel reverse-dns lookups
|
||||||
|
)
|
||||||
|
if r.Ordered {
|
||||||
|
r.parallel = parallel.NewOrdered(acc, r.asyncAdd, 10000, r.MaxParallelLookups)
|
||||||
|
} else {
|
||||||
|
r.parallel = parallel.NewUnordered(acc, r.asyncAdd, r.MaxParallelLookups)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ReverseDNS) Stop() error {
|
||||||
|
r.parallel.Stop()
|
||||||
|
r.reverseDNSCache.Stop()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ReverseDNS) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
|
||||||
|
r.parallel.Enqueue(metric)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ReverseDNS) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
|
||||||
|
for _, lookup := range r.Lookups {
|
||||||
|
if len(lookup.Field) > 0 {
|
||||||
|
if ipField, ok := metric.GetField(lookup.Field); ok {
|
||||||
|
if ip, ok := ipField.(string); ok {
|
||||||
|
result, err := r.reverseDNSCache.Lookup(ip)
|
||||||
|
if err != nil {
|
||||||
|
r.Log.Errorf("lookup error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(result) > 0 {
|
||||||
|
metric.AddField(lookup.Dest, result[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(lookup.Tag) > 0 {
|
||||||
|
if ipTag, ok := metric.GetTag(lookup.Tag); ok {
|
||||||
|
result, err := r.reverseDNSCache.Lookup(ipTag)
|
||||||
|
if err != nil {
|
||||||
|
r.Log.Errorf("lookup error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(result) > 0 {
|
||||||
|
metric.AddTag(lookup.Dest, result[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return []telegraf.Metric{metric}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.AddStreaming("reverse_dns", func() telegraf.StreamingProcessor {
|
||||||
|
return newReverseDNS()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReverseDNS() *ReverseDNS {
|
||||||
|
return &ReverseDNS{
|
||||||
|
CacheTTL: config.Duration(24 * time.Hour),
|
||||||
|
LookupTimeout: config.Duration(1 * time.Minute),
|
||||||
|
MaxParallelLookups: 10,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,55 @@
|
||||||
|
package reverse_dns
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSimpleReverseLookup(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
m, _ := metric.New("name", map[string]string{
|
||||||
|
"dest_ip": "8.8.8.8",
|
||||||
|
}, map[string]interface{}{
|
||||||
|
"source_ip": "127.0.0.1",
|
||||||
|
}, now)
|
||||||
|
|
||||||
|
dns := newReverseDNS()
|
||||||
|
dns.Lookups = []lookupEntry{
|
||||||
|
{
|
||||||
|
Field: "source_ip",
|
||||||
|
Dest: "source_name",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Tag: "dest_ip",
|
||||||
|
Dest: "dest_name",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
dns.Start(acc)
|
||||||
|
dns.Add(m, acc)
|
||||||
|
dns.Stop()
|
||||||
|
// should be processed now.
|
||||||
|
|
||||||
|
require.Len(t, acc.GetTelegrafMetrics(), 1)
|
||||||
|
processedMetric := acc.GetTelegrafMetrics()[0]
|
||||||
|
f, ok := processedMetric.GetField("source_name")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.EqualValues(t, "localhost", f)
|
||||||
|
|
||||||
|
tag, ok := processedMetric.GetTag("dest_name")
|
||||||
|
require.True(t, ok)
|
||||||
|
require.EqualValues(t, "dns.google.", tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadingConfig(t *testing.T) {
|
||||||
|
c := config.NewConfig()
|
||||||
|
err := c.LoadConfigData([]byte("[[processors.reverse_dns]]\n" + sampleConfig))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Len(t, c.Processors, 1)
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue