chore: Fix linter findings for `revive:exported` in `plugins/inputs/[w-z]*` (#16703)

This commit is contained in:
Paweł Żak 2025-04-01 23:17:51 +02:00 committed by GitHub
parent 49dc1270a3
commit ad76475f03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 412 additions and 477 deletions

View File

@ -16,11 +16,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
measurementDevice = "wireguard_device"
measurementPeer = "wireguard_peer"
)
var ( var (
deviceTypeNames = map[wgtypes.DeviceType]string{ deviceTypeNames = map[wgtypes.DeviceType]string{
wgtypes.Unknown: "unknown", wgtypes.Unknown: "unknown",
@ -29,8 +24,11 @@ var (
} }
) )
// Wireguard is an input that enumerates all Wireguard interfaces/devices on const (
// the host, and reports gauge metrics for the device itself and its peers. measurementDevice = "wireguard_device"
measurementPeer = "wireguard_peer"
)
type Wireguard struct { type Wireguard struct {
Devices []string `toml:"devices"` Devices []string `toml:"devices"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
@ -44,7 +42,6 @@ func (*Wireguard) SampleConfig() string {
func (wg *Wireguard) Init() error { func (wg *Wireguard) Init() error {
var err error var err error
wg.client, err = wgctrl.New() wg.client, err = wgctrl.New()
return err return err

View File

@ -11,7 +11,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Wireless is used to store configuration values.
type Wireless struct { type Wireless struct {
HostProc string `toml:"host_proc"` HostProc string `toml:"host_proc"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`

View File

@ -14,11 +14,11 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
var newLineByte = []byte("\n")
// length of wireless interface fields // length of wireless interface fields
const interfaceFieldLength = 10 const interfaceFieldLength = 10
var newLineByte = []byte("\n")
type wirelessInterface struct { type wirelessInterface struct {
Interface string Interface string
Status int64 Status int64
@ -33,7 +33,6 @@ type wirelessInterface struct {
Beacon int64 Beacon int64
} }
// Gather collects the wireless information.
func (w *Wireless) Gather(acc telegraf.Accumulator) error { func (w *Wireless) Gather(acc telegraf.Accumulator) error {
// load proc path, get default value if config value and env variable are empty // load proc path, get default value if config value and env variable are empty
if w.HostProc == "" { if w.HostProc == "" {

View File

@ -12,7 +12,7 @@ func (w *Wireless) Init() error {
return nil return nil
} }
func (*Wireless) Gather(_ telegraf.Accumulator) error { func (*Wireless) Gather(telegraf.Accumulator) error {
return nil return nil
} }

View File

@ -38,7 +38,6 @@ var sampleConfig string
// Regexp for handling file URIs containing a drive letter and leading slash // Regexp for handling file URIs containing a drive letter and leading slash
var reDriveLetter = regexp.MustCompile(`^/([a-zA-Z]:/)`) var reDriveLetter = regexp.MustCompile(`^/([a-zA-Z]:/)`)
// X509Cert holds the configuration of the plugin.
type X509Cert struct { type X509Cert struct {
Sources []string `toml:"sources"` Sources []string `toml:"sources"`
Timeout config.Duration `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
@ -93,7 +92,6 @@ func (c *X509Cert) Init() error {
return nil return nil
} }
// Gather adds metrics into the accumulator.
func (c *X509Cert) Gather(acc telegraf.Accumulator) error { func (c *X509Cert) Gather(acc telegraf.Accumulator) error {
now := time.Now() now := time.Now()

View File

@ -10,23 +10,14 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type Sysctl func(metric string) ([]string, error)
type Zpool func() ([]string, error)
type Zdataset func(properties []string) ([]string, error)
type Uname func() (string, error)
type Zfs struct { type Zfs struct {
KstatPath string KstatPath string `toml:"kstatPath"`
KstatMetrics []string KstatMetrics []string `toml:"kstatMetrics"`
PoolMetrics bool PoolMetrics bool `toml:"poolMetrics"`
DatasetMetrics bool DatasetMetrics bool `toml:"datasetMetrics"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
sysctl Sysctl //nolint:unused // False positive - this var is used for non-default build tag: freebsd helper //nolint:unused // for OS-specific usage
zpool Zpool //nolint:unused // False positive - this var is used for non-default build tag: freebsd
zdataset Zdataset //nolint:unused // False positive - this var is used for non-default build tag: freebsd
uname Uname //nolint:unused // False positive - this var is used for non-default build tag: freebsd
version int64 //nolint:unused // False positive - this var is used for non-default build tag: freebsd
} }
func (*Zfs) SampleConfig() string { func (*Zfs) SampleConfig() string {

View File

@ -15,6 +15,18 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
type helper struct {
sysctl sysctlF
zpool zpoolF
zdataset zdatasetF
uname unameF
}
type sysctlF func(metric string) ([]string, error)
type zpoolF func() ([]string, error)
type zdatasetF func(properties []string) ([]string, error)
type unameF func() (string, error)
func (z *Zfs) Init() error { func (z *Zfs) Init() error {
// Determine the kernel version to adapt parsing // Determine the kernel version to adapt parsing
release, err := z.uname() release, err := z.uname()
@ -22,7 +34,7 @@ func (z *Zfs) Init() error {
return fmt.Errorf("determining uname failed: %w", err) return fmt.Errorf("determining uname failed: %w", err)
} }
parts := strings.SplitN(release, ".", 2) parts := strings.SplitN(release, ".", 2)
z.version, err = strconv.ParseInt(parts[0], 10, 64) version, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil { if err != nil {
return fmt.Errorf("determining version from %q failed: %w", release, err) return fmt.Errorf("determining version from %q failed: %w", release, err)
} }
@ -31,7 +43,7 @@ func (z *Zfs) Init() error {
// Please note that starting from FreeBSD 14 the 'vdev_cache_stats' are // Please note that starting from FreeBSD 14 the 'vdev_cache_stats' are
// no longer available. // no longer available.
if len(z.KstatMetrics) == 0 { if len(z.KstatMetrics) == 0 {
if z.version < 14 { if version < 14 {
z.KstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"} z.KstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"}
} else { } else {
z.KstatMetrics = []string{"arcstats", "zfetchstats"} z.KstatMetrics = []string{"arcstats", "zfetchstats"}
@ -251,10 +263,12 @@ func uname() (string, error) {
func init() { func init() {
inputs.Add("zfs", func() telegraf.Input { inputs.Add("zfs", func() telegraf.Input {
return &Zfs{ return &Zfs{
sysctl: sysctl, helper: helper{
zpool: zpool, sysctl: sysctl,
zdataset: zdataset, zpool: zpool,
uname: uname, zdataset: zdataset,
uname: uname,
},
} }
}) })
} }

View File

@ -14,170 +14,21 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
type metricsVersion uint8
const ( const (
unknown metricsVersion = iota unknown metricsVersion = iota
v1 v1
v2 v2
) )
type metricsVersion uint8
type poolInfo struct { type poolInfo struct {
name string name string
ioFilename string ioFilename string
version metricsVersion version metricsVersion
} }
func probeVersion(kstatPath string) (metricsVersion, []string, error) { type helper struct{} //nolint:unused // not used for "linux" OS, needed for Zfs struct
poolsDirs, err := filepath.Glob(kstatPath + "/*/objset-*")
// From the docs: the only possible returned error is ErrBadPattern, when pattern is malformed.
// Because of this we need to determine how to fallback differently.
if err != nil {
return unknown, poolsDirs, err
}
if len(poolsDirs) > 0 {
return v2, poolsDirs, nil
}
// Fallback to the old kstat in case of an older ZFS version.
poolsDirs, err = filepath.Glob(kstatPath + "/*/io")
if err != nil {
return unknown, poolsDirs, err
}
return v1, poolsDirs, nil
}
func getPools(kstatPath string) ([]poolInfo, error) {
pools := make([]poolInfo, 0)
version, poolsDirs, err := probeVersion(kstatPath)
if err != nil {
return nil, err
}
for _, poolDir := range poolsDirs {
poolDirSplit := strings.Split(poolDir, "/")
pool := poolDirSplit[len(poolDirSplit)-2]
pools = append(pools, poolInfo{name: pool, ioFilename: poolDir, version: version})
}
return pools, nil
}
func getTags(pools []poolInfo) map[string]string {
poolNames := ""
knownPools := make(map[string]struct{})
for _, entry := range pools {
name := entry.name
if _, ok := knownPools[name]; !ok {
knownPools[name] = struct{}{}
if poolNames != "" {
poolNames += "::"
}
poolNames += name
}
}
return map[string]string{"pools": poolNames}
}
func gather(lines []string, fileLines int) (keys, values []string, err error) {
if len(lines) < fileLines {
return nil, nil, errors.New("expected lines in kstat does not match")
}
keys = strings.Fields(lines[1])
values = strings.Fields(lines[2])
if len(keys) != len(values) {
return nil, nil, fmt.Errorf("key and value count don't match Keys:%v Values:%v", keys, values)
}
return keys, values, nil
}
func gatherV1(lines []string) (map[string]interface{}, error) {
fileLines := 3
keys, values, err := gather(lines, fileLines)
if err != nil {
return nil, err
}
fields := make(map[string]interface{})
for i := 0; i < len(keys); i++ {
value, err := strconv.ParseInt(values[i], 10, 64)
if err != nil {
return nil, err
}
fields[keys[i]] = value
}
return fields, nil
}
// New way of collection. Each objset-* file in ZFS >= 2.1.x has a format looking like this:
// 36 1 0x01 7 2160 5214787391 73405258558961
// name type data
// dataset_name 7 rpool/ROOT/pve-1
// writes 4 409570
// nwritten 4 2063419969
// reads 4 22108699
// nread 4 63067280992
// nunlinks 4 13849
// nunlinked 4 13848
//
// For explanation of the first line's values see https://github.com/openzfs/zfs/blob/master/module/os/linux/spl/spl-kstat.c#L61
func gatherV2(lines []string, tags map[string]string) (map[string]interface{}, error) {
fileLines := 9
_, _, err := gather(lines, fileLines)
if err != nil {
return nil, err
}
tags["dataset"] = strings.Fields(lines[2])[2]
fields := make(map[string]interface{})
for i := 3; i < len(lines); i++ {
lineFields := strings.Fields(lines[i])
fieldName := lineFields[0]
fieldData := lineFields[2]
value, err := strconv.ParseInt(fieldData, 10, 64)
if err != nil {
return nil, err
}
fields[fieldName] = value
}
return fields, nil
}
func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error {
lines, err := internal.ReadLines(pool.ioFilename)
if err != nil {
return err
}
var fields map[string]interface{}
var gatherErr error
tags := map[string]string{"pool": pool.name}
switch pool.version {
case v1:
fields, gatherErr = gatherV1(lines)
case v2:
fields, gatherErr = gatherV2(lines, tags)
case unknown:
return errors.New("unknown metrics version detected")
}
if gatherErr != nil {
return gatherErr
}
acc.AddFields("zfs_pool", fields, tags)
return nil
}
func (z *Zfs) Gather(acc telegraf.Accumulator) error { func (z *Zfs) Gather(acc telegraf.Accumulator) error {
kstatMetrics := z.KstatMetrics kstatMetrics := z.KstatMetrics
@ -236,6 +87,157 @@ func (z *Zfs) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func getPools(kstatPath string) ([]poolInfo, error) {
pools := make([]poolInfo, 0)
version, poolsDirs, err := probeVersion(kstatPath)
if err != nil {
return nil, err
}
for _, poolDir := range poolsDirs {
poolDirSplit := strings.Split(poolDir, "/")
pool := poolDirSplit[len(poolDirSplit)-2]
pools = append(pools, poolInfo{name: pool, ioFilename: poolDir, version: version})
}
return pools, nil
}
func probeVersion(kstatPath string) (metricsVersion, []string, error) {
poolsDirs, err := filepath.Glob(kstatPath + "/*/objset-*")
// From the docs: the only possible returned error is ErrBadPattern, when pattern is malformed.
// Because of this we need to determine how to fallback differently.
if err != nil {
return unknown, poolsDirs, err
}
if len(poolsDirs) > 0 {
return v2, poolsDirs, nil
}
// Fallback to the old kstat in case of an older ZFS version.
poolsDirs, err = filepath.Glob(kstatPath + "/*/io")
if err != nil {
return unknown, poolsDirs, err
}
return v1, poolsDirs, nil
}
func getTags(pools []poolInfo) map[string]string {
poolNames := ""
knownPools := make(map[string]struct{})
for _, entry := range pools {
name := entry.name
if _, ok := knownPools[name]; !ok {
knownPools[name] = struct{}{}
if poolNames != "" {
poolNames += "::"
}
poolNames += name
}
}
return map[string]string{"pools": poolNames}
}
func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error {
lines, err := internal.ReadLines(pool.ioFilename)
if err != nil {
return err
}
var fields map[string]interface{}
var gatherErr error
tags := map[string]string{"pool": pool.name}
switch pool.version {
case v1:
fields, gatherErr = gatherV1(lines)
case v2:
fields, gatherErr = gatherV2(lines, tags)
case unknown:
return errors.New("unknown metrics version detected")
}
if gatherErr != nil {
return gatherErr
}
acc.AddFields("zfs_pool", fields, tags)
return nil
}
func gatherV1(lines []string) (map[string]interface{}, error) {
fileLines := 3
keys, values, err := gather(lines, fileLines)
if err != nil {
return nil, err
}
fields := make(map[string]interface{})
for i := 0; i < len(keys); i++ {
value, err := strconv.ParseInt(values[i], 10, 64)
if err != nil {
return nil, err
}
fields[keys[i]] = value
}
return fields, nil
}
func gather(lines []string, fileLines int) (keys, values []string, err error) {
if len(lines) < fileLines {
return nil, nil, errors.New("expected lines in kstat does not match")
}
keys = strings.Fields(lines[1])
values = strings.Fields(lines[2])
if len(keys) != len(values) {
return nil, nil, fmt.Errorf("key and value count don't match Keys:%v Values:%v", keys, values)
}
return keys, values, nil
}
// New way of collection. Each objset-* file in ZFS >= 2.1.x has a format looking like this:
// 36 1 0x01 7 2160 5214787391 73405258558961
// name type data
// dataset_name 7 rpool/ROOT/pve-1
// writes 4 409570
// nwritten 4 2063419969
// reads 4 22108699
// nread 4 63067280992
// nunlinks 4 13849
// nunlinked 4 13848
//
// For explanation of the first line's values see https://github.com/openzfs/zfs/blob/master/module/os/linux/spl/spl-kstat.c#L61
func gatherV2(lines []string, tags map[string]string) (map[string]interface{}, error) {
fileLines := 9
_, _, err := gather(lines, fileLines)
if err != nil {
return nil, err
}
tags["dataset"] = strings.Fields(lines[2])[2]
fields := make(map[string]interface{})
for i := 3; i < len(lines); i++ {
lineFields := strings.Fields(lines[i])
fieldName := lineFields[0]
fieldData := lineFields[2]
value, err := strconv.ParseInt(fieldData, 10, 64)
if err != nil {
return nil, err
}
fields[fieldName] = value
}
return fields, nil
}
func init() { func init() {
inputs.Add("zfs", func() telegraf.Input { inputs.Add("zfs", func() telegraf.Input {
return &Zfs{} return &Zfs{}

View File

@ -7,7 +7,14 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
func (*Zfs) Gather(_ telegraf.Accumulator) error { type helper struct{} //nolint:unused // not used for "other" OSes, needed for Zfs struct
func (z *Zfs) Init() error {
z.Log.Warn("Current platform is not supported")
return nil
}
func (*Zfs) Gather(telegraf.Accumulator) error {
return nil return nil
} }

View File

@ -33,29 +33,29 @@ import (
) )
var ( var (
BatchSize int batchSize int
MaxBackLog int maxBackLog int
BatchTimeInterval int batchTimeInterval int
SpanCount int spanCount int
ZipkinServerHost string zipkinServerHost string
) )
func init() { func init() {
flag.IntVar(&BatchSize, "batch_size", 10000, "") flag.IntVar(&batchSize, "batch_size", 10000, "")
flag.IntVar(&MaxBackLog, "max_backlog", 100000, "") flag.IntVar(&maxBackLog, "max_backlog", 100000, "")
flag.IntVar(&BatchTimeInterval, "batch_interval", 1, "") flag.IntVar(&batchTimeInterval, "batch_interval", 1, "")
flag.IntVar(&SpanCount, "span_count", 100000, "") flag.IntVar(&spanCount, "span_count", 100000, "")
flag.StringVar(&ZipkinServerHost, "zipkin_host", "localhost", "") flag.StringVar(&zipkinServerHost, "zipkin_host", "localhost", "")
} }
func main() { func main() {
flag.Parse() flag.Parse()
var hostname = fmt.Sprintf("http://%s:9411/api/v1/spans", ZipkinServerHost) var hostname = fmt.Sprintf("http://%s:9411/api/v1/spans", zipkinServerHost)
reporter := zipkinhttp.NewReporter( reporter := zipkinhttp.NewReporter(
hostname, hostname,
zipkinhttp.BatchSize(BatchSize), zipkinhttp.BatchSize(batchSize),
zipkinhttp.MaxBacklog(MaxBackLog), zipkinhttp.MaxBacklog(maxBackLog),
zipkinhttp.BatchInterval(time.Duration(BatchTimeInterval)*time.Second), zipkinhttp.BatchInterval(time.Duration(batchTimeInterval)*time.Second),
) )
defer reporter.Close() defer reporter.Close()
@ -71,8 +71,8 @@ func main() {
tracer := zipkinot.Wrap(nativeTracer) tracer := zipkinot.Wrap(nativeTracer)
log.Printf("Writing %d spans to zipkin server at %s\n", SpanCount, hostname) log.Printf("Writing %d spans to zipkin server at %s\n", spanCount, hostname)
for i := 0; i < SpanCount; i++ { for i := 0; i < spanCount; i++ {
parent := tracer.StartSpan("Parent") parent := tracer.StartSpan("Parent")
parent.LogFields(otlog.Message(fmt.Sprintf("Trace%d", i))) parent.LogFields(otlog.Message(fmt.Sprintf("Trace%d", i)))
parent.Finish() parent.Finish()

View File

@ -2,11 +2,10 @@ package codec
import ( import (
"errors" "errors"
"reflect"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
) )
@ -35,9 +34,7 @@ func Test_MicroToTime(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := MicroToTime(tt.micro); !reflect.DeepEqual(got, tt.want) { require.Equal(t, tt.want, MicroToTime(tt.micro))
t.Errorf("microToTime() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -110,13 +107,10 @@ func Test_minMax(t *testing.T) {
if tt.now != nil { if tt.now != nil {
now = tt.now now = tt.now
} }
got, got1 := minMax(tt.span) gotMin, gotMax := minMax(tt.span)
if !reflect.DeepEqual(got, tt.wantMin) { require.Equal(t, tt.wantMin, gotMin)
t.Errorf("minMax() got = %v, want %v", got, tt.wantMin) require.Equal(t, tt.wantMax, gotMax)
}
if !reflect.DeepEqual(got1, tt.wantMax) {
t.Errorf("minMax() got1 = %v, want %v", got1, tt.wantMax)
}
now = time.Now now = time.Now
}) })
} }
@ -179,9 +173,7 @@ func Test_guessTimestamp(t *testing.T) {
if tt.now != nil { if tt.now != nil {
now = tt.now now = tt.now
} }
if got := guessTimestamp(tt.span); !reflect.DeepEqual(got, tt.want) { require.Equal(t, tt.want, guessTimestamp(tt.span))
t.Errorf("guessTimestamp() = %v, want %v", got, tt.want)
}
now = time.Now now = time.Now
}) })
} }
@ -220,9 +212,7 @@ func Test_convertDuration(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := convertDuration(tt.span); got != tt.want { require.Equal(t, tt.want, convertDuration(tt.span))
t.Errorf("convertDuration() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -259,13 +249,12 @@ func Test_parentID(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, err := parentID(tt.span) got, err := parentID(tt.span)
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("parentID() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if got != tt.want {
t.Errorf("parentID() = %v, want %v", got, tt.want)
} }
require.Equal(t, tt.want, got)
}) })
} }
} }
@ -346,9 +335,7 @@ func Test_serviceEndpoint(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := serviceEndpoint(tt.ann, tt.bann); !reflect.DeepEqual(got, tt.want) { require.Equal(t, tt.want, serviceEndpoint(tt.ann, tt.bann))
t.Errorf("serviceEndpoint() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -388,9 +375,7 @@ func TestNewBinaryAnnotations(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := NewBinaryAnnotations(tt.annotations, tt.endpoint); !reflect.DeepEqual(got, tt.want) { require.Equal(t, tt.want, NewBinaryAnnotations(tt.annotations, tt.endpoint))
t.Errorf("NewBinaryAnnotations() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -430,9 +415,7 @@ func TestNewAnnotations(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := NewAnnotations(tt.annotations, tt.endpoint); !reflect.DeepEqual(got, tt.want) { require.Equal(t, tt.want, NewAnnotations(tt.annotations, tt.endpoint))
t.Errorf("NewAnnotations() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -524,13 +507,12 @@ func TestNewTrace(t *testing.T) {
now = tt.now now = tt.now
} }
got, err := NewTrace(tt.spans) got, err := NewTrace(tt.spans)
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("NewTrace() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if !cmp.Equal(tt.want, got) {
t.Errorf("NewTrace() = %s", cmp.Diff(tt.want, got))
} }
require.Equal(t, tt.want, got)
now = time.Now now = time.Now
}) })
} }

View File

@ -24,7 +24,7 @@ func (*JSON) Decode(octets []byte) ([]codec.Span, error) {
res := make([]codec.Span, 0, len(spans)) res := make([]codec.Span, 0, len(spans))
for i := range spans { for i := range spans {
if err := spans[i].Validate(); err != nil { if err := spans[i].validate(); err != nil {
return nil, err return nil, err
} }
res = append(res, &spans[i]) res = append(res, &spans[i])
@ -44,7 +44,7 @@ type span struct {
BAnno []binaryAnnotation `json:"binaryAnnotations"` BAnno []binaryAnnotation `json:"binaryAnnotations"`
} }
func (s *span) Validate() error { func (s *span) validate() error {
var err error var err error
check := func(f func() (string, error)) { check := func(f func() (string, error)) {
if err != nil { if err != nil {
@ -68,21 +68,21 @@ func (s *span) Trace() (string, error) {
if s.TraceID == "" { if s.TraceID == "" {
return "", errors.New("trace ID cannot be null") return "", errors.New("trace ID cannot be null")
} }
return TraceIDFromString(s.TraceID) return traceIDFromString(s.TraceID)
} }
func (s *span) SpanID() (string, error) { func (s *span) SpanID() (string, error) {
if s.ID == "" { if s.ID == "" {
return "", errors.New("span ID cannot be null") return "", errors.New("span ID cannot be null")
} }
return IDFromString(s.ID) return idFromString(s.ID)
} }
func (s *span) Parent() (string, error) { func (s *span) Parent() (string, error) {
if s.ParentID == "" { if s.ParentID == "" {
return "", nil return "", nil
} }
return IDFromString(s.ParentID) return idFromString(s.ParentID)
} }
func (s *span) Name() string { func (s *span) Name() string {
@ -215,8 +215,8 @@ func (e *endpoint) Name() string {
return e.ServiceName return e.ServiceName
} }
// TraceIDFromString creates a TraceID from a hexadecimal string // traceIDFromString creates a TraceID from a hexadecimal string
func TraceIDFromString(s string) (string, error) { func traceIDFromString(s string) (string, error) {
var hi, lo uint64 var hi, lo uint64
var err error var err error
if len(s) > 32 { if len(s) > 32 {
@ -240,8 +240,8 @@ func TraceIDFromString(s string) (string, error) {
return fmt.Sprintf("%x%016x", hi, lo), nil return fmt.Sprintf("%x%016x", hi, lo), nil
} }
// IDFromString validates the ID and returns it in hexadecimal format. // idFromString validates the ID and returns it in hexadecimal format.
func IDFromString(s string) (string, error) { func idFromString(s string) (string, error) {
if len(s) > 16 { if len(s) > 16 {
return "", fmt.Errorf("length of ID cannot be greater than 16 hex characters: %s", s) return "", fmt.Errorf("length of ID cannot be greater than 16 hex characters: %s", s)
} }

View File

@ -5,7 +5,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/zipkin/codec" "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec"
) )
@ -452,13 +452,12 @@ func TestJSON_Decode(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
j := &JSON{} j := &JSON{}
got, err := j.Decode(tt.octets) got, err := j.Decode(tt.octets)
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("JSON.Decode() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if !cmp.Equal(tt.want, got) {
t.Errorf("JSON.Decode() = got(-)/want(+) %s", cmp.Diff(tt.want, got))
} }
require.Equal(t, tt.want, got)
}) })
} }
} }
@ -502,13 +501,12 @@ func Test_span_Trace(t *testing.T) {
TraceID: tt.TraceID, TraceID: tt.TraceID,
} }
got, err := s.Trace() got, err := s.Trace()
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("span.Trace() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if !cmp.Equal(tt.want, got) {
t.Errorf("span.Trace() = got(-)/want(+) %s", cmp.Diff(tt.want, got))
} }
require.Equal(t, tt.want, got)
}) })
} }
} }
@ -552,13 +550,12 @@ func Test_span_SpanID(t *testing.T) {
ID: tt.ID, ID: tt.ID,
} }
got, err := s.SpanID() got, err := s.SpanID()
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("span.SpanID() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if !cmp.Equal(tt.want, got) {
t.Errorf("span.SpanID() = got(-)/want(+) %s", cmp.Diff(tt.want, got))
} }
require.Equal(t, tt.want, got)
}) })
} }
} }
@ -597,13 +594,12 @@ func Test_span_Parent(t *testing.T) {
ParentID: tt.ParentID, ParentID: tt.ParentID,
} }
got, err := s.Parent() got, err := s.Parent()
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("span.Parent() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if !cmp.Equal(tt.want, got) {
t.Errorf("span.Parent() = got(-)/want(+) %s", cmp.Diff(tt.want, got))
} }
require.Equal(t, tt.want, got)
}) })
} }
} }
@ -630,9 +626,7 @@ func Test_span_Timestamp(t *testing.T) {
s := &span{ s := &span{
Time: tt.Time, Time: tt.Time,
} }
if got := s.Timestamp(); !cmp.Equal(tt.want, got) { require.Equal(t, tt.want, s.Timestamp())
t.Errorf("span.Timestamp() = got(-)/want(+) %s", cmp.Diff(tt.want, got))
}
}) })
} }
} }
@ -659,9 +653,7 @@ func Test_span_Duration(t *testing.T) {
s := &span{ s := &span{
Dur: tt.dur, Dur: tt.dur,
} }
if got := s.Duration(); got != tt.want { require.Equal(t, tt.want, s.Duration())
t.Errorf("span.Duration() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -703,15 +695,9 @@ func Test_annotation(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
an := annotation(tt.fields) an := annotation(tt.fields)
a := &an a := &an
if got := a.Timestamp(); !got.Equal(tt.tm) { require.Equal(t, tt.tm, a.Timestamp())
t.Errorf("annotation.Timestamp() = %v, want %v", got, tt.tm) require.Equal(t, tt.val, a.Value())
} require.Equal(t, tt.endpoint, a.Host())
if got := a.Value(); got != tt.val {
t.Errorf("annotation.Value() = %v, want %v", got, tt.val)
}
if got := a.Host(); !cmp.Equal(tt.endpoint, got) {
t.Errorf("annotation.Endpoint() = %v, want %v", got, tt.endpoint)
}
}) })
} }
} }
@ -754,15 +740,9 @@ func Test_binaryAnnotation(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
bin := binaryAnnotation(tt.fields) bin := binaryAnnotation(tt.fields)
b := &bin b := &bin
if got := b.Key(); got != tt.key { require.Equal(t, tt.key, b.Key())
t.Errorf("binaryAnnotation.Key() = %v, want %v", got, tt.key) require.Equal(t, tt.value, b.Value())
} require.Equal(t, tt.endpoint, b.Host())
if got := b.Value(); got != tt.value {
t.Errorf("binaryAnnotation.Value() = %v, want %v", got, tt.value)
}
if got := b.Host(); !cmp.Equal(tt.endpoint, got) {
t.Errorf("binaryAnnotation.Endpoint() = %v, want %v", got, tt.endpoint)
}
}) })
} }
} }
@ -799,9 +779,7 @@ func Test_endpoint_Host(t *testing.T) {
Ipv4: tt.fields.Ipv4, Ipv4: tt.fields.Ipv4,
Port: tt.fields.Port, Port: tt.fields.Port,
} }
if got := e.Host(); got != tt.want { require.Equal(t, tt.want, e.Host())
t.Errorf("endpoint.Host() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -823,9 +801,7 @@ func Test_endpoint_Name(t *testing.T) {
e := &endpoint{ e := &endpoint{
ServiceName: tt.ServiceName, ServiceName: tt.ServiceName,
} }
if got := e.Name(); got != tt.want { require.Equal(t, tt.want, e.Name())
t.Errorf("endpoint.Name() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -870,14 +846,13 @@ func TestTraceIDFromString(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, err := TraceIDFromString(tt.s) got, err := traceIDFromString(tt.s)
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("TraceIDFromString() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if got != tt.want {
t.Errorf("TraceIDFromString() = %v, want %v", got, tt.want)
} }
require.Equal(t, tt.want, got)
}) })
} }
} }
@ -907,14 +882,13 @@ func TestIDFromString(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, err := IDFromString(tt.s) got, err := idFromString(tt.s)
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("IDFromString() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if got != tt.want {
t.Errorf("IDFromString() = %v, want %v", got, tt.want)
} }
require.Equal(t, tt.want, got)
}) })
} }
} }

View File

@ -15,8 +15,25 @@ import (
"github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift/gen-go/zipkincore" "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift/gen-go/zipkincore"
) )
// UnmarshalThrift converts raw bytes in thrift format to a slice of spans // Thrift decodes binary data to create a Trace
func UnmarshalThrift(body []byte) ([]*zipkincore.Span, error) { type Thrift struct{}
// Decode unmarshals and validates bytes in thrift format
func (*Thrift) Decode(octets []byte) ([]codec.Span, error) {
spans, err := unmarshalThrift(octets)
if err != nil {
return nil, err
}
res := make([]codec.Span, 0, len(spans))
for _, s := range spans {
res = append(res, &span{s})
}
return res, nil
}
// unmarshalThrift converts raw bytes in thrift format to a slice of spans
func unmarshalThrift(body []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer() buffer := thrift.NewTMemoryBuffer()
buffer.Write(body) buffer.Write(body)
@ -41,23 +58,6 @@ func UnmarshalThrift(body []byte) ([]*zipkincore.Span, error) {
return spans, nil return spans, nil
} }
// Thrift decodes binary data to create a Trace
type Thrift struct{}
// Decode unmarshals and validates bytes in thrift format
func (*Thrift) Decode(octets []byte) ([]codec.Span, error) {
spans, err := UnmarshalThrift(octets)
if err != nil {
return nil, err
}
res := make([]codec.Span, 0, len(spans))
for _, s := range spans {
res = append(res, &span{s})
}
return res, nil
}
var _ codec.Endpoint = &endpoint{} var _ codec.Endpoint = &endpoint{}
type endpoint struct { type endpoint struct {

View File

@ -4,7 +4,7 @@ import (
"os" "os"
"testing" "testing"
"github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift/gen-go/zipkincore" "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift/gen-go/zipkincore"
) )
@ -49,9 +49,7 @@ func Test_endpointHost(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
e := endpoint{tt.args.h} e := endpoint{tt.args.h}
if got := e.Host(); got != tt.want { require.Equal(t, tt.want, e.Host())
t.Errorf("host() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -85,9 +83,7 @@ func Test_endpointName(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
e := endpoint{tt.args.h} e := endpoint{tt.args.h}
if got := e.Name(); got != tt.want { require.Equal(t, tt.want, e.Name())
t.Errorf("serviceName() = %v, want %v", got, tt.want)
}
}) })
} }
} }
@ -198,14 +194,13 @@ func TestUnmarshalThrift(t *testing.T) {
t.Fatalf("Could not find file %s\n", tt.filename) t.Fatalf("Could not find file %s\n", tt.filename)
} }
got, err := UnmarshalThrift(dat) got, err := unmarshalThrift(dat)
if (err != nil) != tt.wantErr { if tt.wantErr {
t.Errorf("UnmarshalThrift() error = %v, wantErr %v", err, tt.wantErr) require.Error(t, err)
return } else {
} require.NoError(t, err)
if !cmp.Equal(tt.want, got) {
t.Errorf("UnmarshalThrift() got(-)/want(+): %s", cmp.Diff(tt.want, got))
} }
require.Equal(t, tt.want, got)
}) })
} }
} }

View File

@ -7,25 +7,22 @@ import (
"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
) )
// LineProtocolConverter implements the Recorder interface; it is a // lineProtocolConverter implements the recorder interface;
// type meant to encapsulate the storage of zipkin tracing data in // it is a type meant to encapsulate the storage of zipkin tracing data in telegraf as line protocol.
// telegraf as line protocol. type lineProtocolConverter struct {
type LineProtocolConverter struct {
acc telegraf.Accumulator acc telegraf.Accumulator
} }
// NewLineProtocolConverter returns an instance of LineProtocolConverter that // newLineProtocolConverter returns an instance of lineProtocolConverter that will add to the given telegraf.Accumulator
// will add to the given telegraf.Accumulator func newLineProtocolConverter(acc telegraf.Accumulator) *lineProtocolConverter {
func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter { return &lineProtocolConverter{
return &LineProtocolConverter{
acc: acc, acc: acc,
} }
} }
// Record is LineProtocolConverter's implementation of the Record method of // record is lineProtocolConverter's implementation of the record method of the recorder interface;
// the Recorder interface; it takes a trace as input, and adds it to an internal // it takes a trace as input, and adds it to an internal telegraf.Accumulator.
// telegraf.Accumulator. func (l *lineProtocolConverter) record(t trace.Trace) error {
func (l *LineProtocolConverter) Record(t trace.Trace) error {
for _, s := range t { for _, s := range t {
fields := map[string]interface{}{ fields := map[string]interface{}{
"duration_ns": s.Duration.Nanoseconds(), "duration_ns": s.Duration.Nanoseconds(),
@ -71,12 +68,11 @@ func (l *LineProtocolConverter) Record(t trace.Trace) error {
return nil return nil
} }
func (l *LineProtocolConverter) Error(err error) { func (l *lineProtocolConverter) error(err error) {
l.acc.AddError(err) l.acc.AddError(err)
} }
// formatName formats name and service name // formatName formats name and service name Zipkin forces span and service names to be lowercase:
// Zipkin forces span and service names to be lowercase:
// https://github.com/openzipkin/zipkin/pull/805 // https://github.com/openzipkin/zipkin/pull/805
func formatName(name string) string { func formatName(name string) string {
return strings.ToLower(name) return strings.ToLower(name)

View File

@ -4,7 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
@ -325,22 +325,24 @@ func TestLineProtocolConverter_Record(t *testing.T) {
}, },
}, },
} }
for i, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
mockAcc.ClearMetrics() mockAcc.ClearMetrics()
l := &LineProtocolConverter{ l := &lineProtocolConverter{
acc: tt.fields.acc, acc: tt.fields.acc,
} }
if err := l.Record(tt.args.t); (err != nil) != tt.wantErr { err := l.record(tt.args.t)
t.Errorf("LineProtocolConverter.Record() error = %v, wantErr %v", err, tt.wantErr) if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
} }
got := make([]testutil.Metric, 0, len(mockAcc.Metrics)) got := make([]testutil.Metric, 0, len(mockAcc.Metrics))
for _, metric := range mockAcc.Metrics { for _, metric := range mockAcc.Metrics {
got = append(got, *metric) got = append(got, *metric)
} }
if !cmp.Equal(got, tt.want) { require.Equal(t, tt.want, got)
t.Errorf("LineProtocolConverter.Record()/%s/%d error = %s ", tt.name, i, cmp.Diff(got, tt.want))
}
}) })
} }
} }

View File

@ -15,17 +15,15 @@ import (
"github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift" "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift"
) )
// SpanHandler is an implementation of a Handler which accepts zipkin thrift // spanHandler is an implementation of a handler which accepts zipkin thrift span data and sends it to the recorder
// span data and sends it to the recorder type spanHandler struct {
type SpanHandler struct { path string
Path string recorder recorder
recorder Recorder
} }
// NewSpanHandler returns a new server instance given path to handle func newSpanHandler(path string) *spanHandler {
func NewSpanHandler(path string) *SpanHandler { return &spanHandler{
return &SpanHandler{ path: path,
Path: path,
} }
} }
@ -58,17 +56,17 @@ func cors(next http.HandlerFunc) http.HandlerFunc {
} }
} }
// Register implements the Service interface. Register accepts zipkin thrift data // register implements the Service interface. Register accepts zipkin thrift data
// POSTed to the path of the mux router // POSTed to the path of the mux router
func (s *SpanHandler) Register(router *mux.Router, recorder Recorder) error { func (s *spanHandler) register(router *mux.Router, recorder recorder) error {
handler := cors(http.HandlerFunc(s.Spans)) handler := cors(http.HandlerFunc(s.spans))
router.Handle(s.Path, handler).Methods("POST", "OPTIONS") router.Handle(s.path, handler).Methods("POST", "OPTIONS")
s.recorder = recorder s.recorder = recorder
return nil return nil
} }
// Spans handles zipkin thrift spans // spans handles zipkin thrift spans
func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { func (s *spanHandler) spans(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() defer r.Body.Close()
body := r.Body body := r.Body
var err error var err error
@ -76,42 +74,42 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Encoding") == "gzip" { if r.Header.Get("Content-Encoding") == "gzip" {
body, err = gzip.NewReader(r.Body) body, err = gzip.NewReader(r.Body)
if err != nil { if err != nil {
s.recorder.Error(err) s.recorder.error(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
defer body.Close() defer body.Close()
} }
decoder, err := ContentDecoder(r) decoder, err := contentDecoder(r)
if err != nil { if err != nil {
s.recorder.Error(err) s.recorder.error(err)
w.WriteHeader(http.StatusUnsupportedMediaType) w.WriteHeader(http.StatusUnsupportedMediaType)
} }
octets, err := io.ReadAll(body) octets, err := io.ReadAll(body)
if err != nil { if err != nil {
s.recorder.Error(err) s.recorder.error(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
spans, err := decoder.Decode(octets) spans, err := decoder.Decode(octets)
if err != nil { if err != nil {
s.recorder.Error(err) s.recorder.error(err)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
trace, err := codec.NewTrace(spans) trace, err := codec.NewTrace(spans)
if err != nil { if err != nil {
s.recorder.Error(err) s.recorder.error(err)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
if err = s.recorder.Record(trace); err != nil { if err = s.recorder.record(trace); err != nil {
s.recorder.Error(err) s.recorder.error(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
@ -119,10 +117,10 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
// ContentDecoder returns a Decoder that is able to produce Traces from bytes. // contentDecoder returns a Decoder that is able to produce Traces from bytes.
// Failure should yield an HTTP 415 (`http.StatusUnsupportedMediaType`) // Failure should yield an HTTP 415 (`http.StatusUnsupportedMediaType`)
// If a Content-Type is not set, zipkin assumes application/json // If a Content-Type is not set, zipkin assumes application/json
func ContentDecoder(r *http.Request) (codec.Decoder, error) { func contentDecoder(r *http.Request) (codec.Decoder, error) {
contentType := r.Header.Get("Content-Type") contentType := r.Header.Get("Content-Type")
if contentType == "" { if contentType == "" {
return &json_v1.JSON{}, nil return &json_v1.JSON{}, nil

View File

@ -10,23 +10,23 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
) )
type MockRecorder struct { type mockRecorder struct {
Data trace.Trace data trace.Trace
Err error err error
} }
func (m *MockRecorder) Record(t trace.Trace) error { func (m *mockRecorder) record(t trace.Trace) error {
m.Data = t m.data = t
return nil return nil
} }
func (m *MockRecorder) Error(err error) { func (m *mockRecorder) error(err error) {
m.Err = err m.err = err
} }
func TestSpanHandler(t *testing.T) { func TestSpanHandler(t *testing.T) {
@ -43,16 +43,14 @@ func TestSpanHandler(t *testing.T) {
bytes.NewReader(dat))) bytes.NewReader(dat)))
r.Header.Set("Content-Type", "application/x-thrift") r.Header.Set("Content-Type", "application/x-thrift")
handler := NewSpanHandler("/api/v1/spans") handler := newSpanHandler("/api/v1/spans")
mockRecorder := &MockRecorder{} mockRecorder := &mockRecorder{}
handler.recorder = mockRecorder handler.recorder = mockRecorder
handler.Spans(w, r) handler.spans(w, r)
if w.Code != http.StatusNoContent { require.Equal(t, http.StatusNoContent, w.Code)
t.Errorf("MainHandler did not return StatusNoContent %d", w.Code)
}
got := mockRecorder.Data got := mockRecorder.data
parentID := strconv.FormatInt(22964302721410078, 16) parentID := strconv.FormatInt(22964302721410078, 16)
want := trace.Trace{ want := trace.Trace{
@ -131,7 +129,5 @@ func TestSpanHandler(t *testing.T) {
}, },
} }
if !cmp.Equal(got, want) { require.Equal(t, want, got)
t.Fatalf("Got != Want\n %s", cmp.Diff(got, want))
}
} }

View File

@ -22,6 +22,11 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
var (
// defaultNetwork is the network to listen on; use only in tests.
defaultNetwork = "tcp"
)
const ( const (
// defaultPort is the default port zipkin listens on, which zipkin implementations expect. // defaultPort is the default port zipkin listens on, which zipkin implementations expect.
defaultPort = 9411 defaultPort = 9411
@ -36,51 +41,35 @@ const (
defaultWriteTimeout = 10 * time.Second defaultWriteTimeout = 10 * time.Second
) )
var (
// defaultNetwork is the network to listen on; use only in tests.
defaultNetwork = "tcp"
)
// Recorder represents a type which can record zipkin trace data as well as
// any accompanying errors, and process that data.
type Recorder interface {
Record(trace.Trace) error
Error(error)
}
// Handler represents a type which can register itself with a router for
// http routing, and a Recorder for trace data collection.
type Handler interface {
Register(router *mux.Router, recorder Recorder) error
}
// Zipkin is a telegraf configuration structure for the zipkin input plugin,
// but it also contains fields for the management of a separate, concurrent
// zipkin http server
type Zipkin struct { type Zipkin struct {
Port int `toml:"port"` Port int `toml:"port"`
Path string `toml:"path"` Path string `toml:"path"`
ReadTimeout config.Duration `toml:"read_timeout"` ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"` WriteTimeout config.Duration `toml:"write_timeout"`
Log telegraf.Logger Log telegraf.Logger `toml:"-"`
address string address string
handler Handler handler handler
server *http.Server server *http.Server
waitGroup *sync.WaitGroup waitGroup *sync.WaitGroup
} }
// recorder represents a type which can record zipkin trace data as well as any accompanying errors, and process that data.
type recorder interface {
record(trace.Trace) error
error(error)
}
// handler represents a type which can register itself with a router for http routing, and a recorder for trace data collection.
type handler interface {
register(router *mux.Router, recorder recorder) error
}
func (*Zipkin) SampleConfig() string { func (*Zipkin) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather is empty for the zipkin plugin; all gathering is done through
// the separate goroutine launched in (*Zipkin).Start()
func (*Zipkin) Gather(telegraf.Accumulator) error { return nil }
// Start launches a separate goroutine for collecting zipkin client http requests,
// passing in a telegraf.Accumulator such that data can be collected.
func (z *Zipkin) Start(acc telegraf.Accumulator) error { func (z *Zipkin) Start(acc telegraf.Accumulator) error {
if z.ReadTimeout < config.Duration(time.Second) { if z.ReadTimeout < config.Duration(time.Second) {
z.ReadTimeout = config.Duration(defaultReadTimeout) z.ReadTimeout = config.Duration(defaultReadTimeout)
@ -89,14 +78,14 @@ func (z *Zipkin) Start(acc telegraf.Accumulator) error {
z.WriteTimeout = config.Duration(defaultWriteTimeout) z.WriteTimeout = config.Duration(defaultWriteTimeout)
} }
z.handler = NewSpanHandler(z.Path) z.handler = newSpanHandler(z.Path)
var wg sync.WaitGroup var wg sync.WaitGroup
z.waitGroup = &wg z.waitGroup = &wg
router := mux.NewRouter() router := mux.NewRouter()
converter := NewLineProtocolConverter(acc) converter := newLineProtocolConverter(acc)
if err := z.handler.Register(router, converter); err != nil { if err := z.handler.register(router, converter); err != nil {
return err return err
} }
@ -119,13 +108,14 @@ func (z *Zipkin) Start(acc telegraf.Accumulator) error {
go func() { go func() {
defer wg.Done() defer wg.Done()
z.Listen(ln, acc) z.listen(ln, acc)
}() }()
return nil return nil
} }
// Stop shuts the internal http server down with via context.Context func (*Zipkin) Gather(telegraf.Accumulator) error { return nil }
func (z *Zipkin) Stop() { func (z *Zipkin) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
@ -135,9 +125,9 @@ func (z *Zipkin) Stop() {
z.server.Shutdown(ctx) //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway z.server.Shutdown(ctx) //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
} }
// Listen creates a http server on the zipkin instance it is called with, and // listen creates a http server on the zipkin instance it is called with, and
// serves http until it is stopped by Zipkin's (*Zipkin).Stop() method. // serves http until it is stopped by Zipkin's (*Zipkin).Stop() method.
func (z *Zipkin) Listen(ln net.Listener, acc telegraf.Accumulator) { func (z *Zipkin) listen(ln net.Listener, acc telegraf.Accumulator) {
if err := z.server.Serve(ln); err != nil { if err := z.server.Serve(ln); err != nil {
// Because of the clean shutdown in `(*Zipkin).Stop()` // Because of the clean shutdown in `(*Zipkin).Stop()`
// We're expecting a server closed error at some point // We're expecting a server closed error at some point

View File

@ -8,7 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -626,9 +626,8 @@ func TestZipkinPlugin(t *testing.T) {
for _, m := range mockAcc.Metrics { for _, m := range mockAcc.Metrics {
got = append(got, *m) got = append(got, *m)
} }
if !cmp.Equal(tt.want, got) {
t.Fatalf("Got != Want\n %s", cmp.Diff(tt.want, got)) require.Equal(t, tt.want, got)
}
}) })
} }
mockAcc.ClearMetrics() mockAcc.ClearMetrics()

View File

@ -24,7 +24,6 @@ var sampleConfig string
var zookeeperFormatRE = regexp.MustCompile(`^zk_(\w[\w\.\-]*)\s+([\w\.\-]+)`) var zookeeperFormatRE = regexp.MustCompile(`^zk_(\w[\w\.\-]*)\s+([\w\.\-]+)`)
// Zookeeper is a zookeeper plugin
type Zookeeper struct { type Zookeeper struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
Timeout config.Duration `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
@ -38,25 +37,10 @@ type Zookeeper struct {
tlsConfig *tls.Config tlsConfig *tls.Config
} }
var defaultTimeout = 5 * time.Second
func (z *Zookeeper) dial(ctx context.Context, addr string) (net.Conn, error) {
var dialer net.Dialer
if z.EnableTLS || z.EnableSSL {
deadline, ok := ctx.Deadline()
if ok {
dialer.Deadline = deadline
}
return tls.DialWithDialer(&dialer, "tcp", addr, z.tlsConfig)
}
return dialer.DialContext(ctx, "tcp", addr)
}
func (*Zookeeper) SampleConfig() string { func (*Zookeeper) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather reads stats from all configured servers accumulates stats
func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { func (z *Zookeeper) Gather(acc telegraf.Accumulator) error {
ctx := context.Background() ctx := context.Background()
@ -70,7 +54,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error {
} }
if z.Timeout < config.Duration(1*time.Second) { if z.Timeout < config.Duration(1*time.Second) {
z.Timeout = config.Duration(defaultTimeout) z.Timeout = config.Duration(5 * time.Second)
} }
ctx, cancel := context.WithTimeout(ctx, time.Duration(z.Timeout)) ctx, cancel := context.WithTimeout(ctx, time.Duration(z.Timeout))
@ -170,6 +154,18 @@ func (z *Zookeeper) gatherServer(ctx context.Context, address string, acc telegr
return nil return nil
} }
func (z *Zookeeper) dial(ctx context.Context, addr string) (net.Conn, error) {
var dialer net.Dialer
if z.EnableTLS || z.EnableSSL {
deadline, ok := ctx.Deadline()
if ok {
dialer.Deadline = deadline
}
return tls.DialWithDialer(&dialer, "tcp", addr, z.tlsConfig)
}
return dialer.DialContext(ctx, "tcp", addr)
}
func init() { func init() {
inputs.Add("zookeeper", func() telegraf.Input { inputs.Add("zookeeper", func() telegraf.Input {
return &Zookeeper{} return &Zookeeper{}