diff --git a/plugins/inputs/wireguard/wireguard.go b/plugins/inputs/wireguard/wireguard.go index 4e4db1b42..da53c6120 100644 --- a/plugins/inputs/wireguard/wireguard.go +++ b/plugins/inputs/wireguard/wireguard.go @@ -16,11 +16,6 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - measurementDevice = "wireguard_device" - measurementPeer = "wireguard_peer" -) - var ( deviceTypeNames = map[wgtypes.DeviceType]string{ wgtypes.Unknown: "unknown", @@ -29,8 +24,11 @@ var ( } ) -// Wireguard is an input that enumerates all Wireguard interfaces/devices on -// the host, and reports gauge metrics for the device itself and its peers. +const ( + measurementDevice = "wireguard_device" + measurementPeer = "wireguard_peer" +) + type Wireguard struct { Devices []string `toml:"devices"` Log telegraf.Logger `toml:"-"` @@ -44,7 +42,6 @@ func (*Wireguard) SampleConfig() string { func (wg *Wireguard) Init() error { var err error - wg.client, err = wgctrl.New() return err diff --git a/plugins/inputs/wireless/wireless.go b/plugins/inputs/wireless/wireless.go index 8fbe4327a..92fbeba27 100644 --- a/plugins/inputs/wireless/wireless.go +++ b/plugins/inputs/wireless/wireless.go @@ -11,7 +11,6 @@ import ( //go:embed sample.conf var sampleConfig string -// Wireless is used to store configuration values. type Wireless struct { HostProc string `toml:"host_proc"` Log telegraf.Logger `toml:"-"` diff --git a/plugins/inputs/wireless/wireless_linux.go b/plugins/inputs/wireless/wireless_linux.go index 54da833d5..3653a3150 100644 --- a/plugins/inputs/wireless/wireless_linux.go +++ b/plugins/inputs/wireless/wireless_linux.go @@ -14,11 +14,11 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) +var newLineByte = []byte("\n") + // length of wireless interface fields const interfaceFieldLength = 10 -var newLineByte = []byte("\n") - type wirelessInterface struct { Interface string Status int64 @@ -33,7 +33,6 @@ type wirelessInterface struct { Beacon int64 } -// Gather collects the wireless information. func (w *Wireless) Gather(acc telegraf.Accumulator) error { // load proc path, get default value if config value and env variable are empty if w.HostProc == "" { diff --git a/plugins/inputs/wireless/wireless_notlinux.go b/plugins/inputs/wireless/wireless_notlinux.go index 8b876211c..271fc57a4 100644 --- a/plugins/inputs/wireless/wireless_notlinux.go +++ b/plugins/inputs/wireless/wireless_notlinux.go @@ -12,7 +12,7 @@ func (w *Wireless) Init() error { return nil } -func (*Wireless) Gather(_ telegraf.Accumulator) error { +func (*Wireless) Gather(telegraf.Accumulator) error { return nil } diff --git a/plugins/inputs/x509_cert/x509_cert.go b/plugins/inputs/x509_cert/x509_cert.go index 25d25b6c2..68179bf64 100644 --- a/plugins/inputs/x509_cert/x509_cert.go +++ b/plugins/inputs/x509_cert/x509_cert.go @@ -38,7 +38,6 @@ var sampleConfig string // Regexp for handling file URIs containing a drive letter and leading slash var reDriveLetter = regexp.MustCompile(`^/([a-zA-Z]:/)`) -// X509Cert holds the configuration of the plugin. type X509Cert struct { Sources []string `toml:"sources"` Timeout config.Duration `toml:"timeout"` @@ -93,7 +92,6 @@ func (c *X509Cert) Init() error { return nil } -// Gather adds metrics into the accumulator. func (c *X509Cert) Gather(acc telegraf.Accumulator) error { now := time.Now() diff --git a/plugins/inputs/zfs/zfs.go b/plugins/inputs/zfs/zfs.go index 9b135acd9..45ce0d522 100644 --- a/plugins/inputs/zfs/zfs.go +++ b/plugins/inputs/zfs/zfs.go @@ -10,23 +10,14 @@ import ( //go:embed sample.conf var sampleConfig string -type Sysctl func(metric string) ([]string, error) -type Zpool func() ([]string, error) -type Zdataset func(properties []string) ([]string, error) -type Uname func() (string, error) - type Zfs struct { - KstatPath string - KstatMetrics []string - PoolMetrics bool - DatasetMetrics bool + KstatPath string `toml:"kstatPath"` + KstatMetrics []string `toml:"kstatMetrics"` + PoolMetrics bool `toml:"poolMetrics"` + DatasetMetrics bool `toml:"datasetMetrics"` Log telegraf.Logger `toml:"-"` - sysctl Sysctl //nolint:unused // False positive - this var is used for non-default build tag: freebsd - zpool Zpool //nolint:unused // False positive - this var is used for non-default build tag: freebsd - zdataset Zdataset //nolint:unused // False positive - this var is used for non-default build tag: freebsd - uname Uname //nolint:unused // False positive - this var is used for non-default build tag: freebsd - version int64 //nolint:unused // False positive - this var is used for non-default build tag: freebsd + helper //nolint:unused // for OS-specific usage } func (*Zfs) SampleConfig() string { diff --git a/plugins/inputs/zfs/zfs_freebsd.go b/plugins/inputs/zfs/zfs_freebsd.go index 2366c3906..520b0724e 100644 --- a/plugins/inputs/zfs/zfs_freebsd.go +++ b/plugins/inputs/zfs/zfs_freebsd.go @@ -15,6 +15,18 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) +type helper struct { + sysctl sysctlF + zpool zpoolF + zdataset zdatasetF + uname unameF +} + +type sysctlF func(metric string) ([]string, error) +type zpoolF func() ([]string, error) +type zdatasetF func(properties []string) ([]string, error) +type unameF func() (string, error) + func (z *Zfs) Init() error { // Determine the kernel version to adapt parsing release, err := z.uname() @@ -22,7 +34,7 @@ func (z *Zfs) Init() error { return fmt.Errorf("determining uname failed: %w", err) } parts := strings.SplitN(release, ".", 2) - z.version, err = strconv.ParseInt(parts[0], 10, 64) + version, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { return fmt.Errorf("determining version from %q failed: %w", release, err) } @@ -31,7 +43,7 @@ func (z *Zfs) Init() error { // Please note that starting from FreeBSD 14 the 'vdev_cache_stats' are // no longer available. if len(z.KstatMetrics) == 0 { - if z.version < 14 { + if version < 14 { z.KstatMetrics = []string{"arcstats", "zfetchstats", "vdev_cache_stats"} } else { z.KstatMetrics = []string{"arcstats", "zfetchstats"} @@ -251,10 +263,12 @@ func uname() (string, error) { func init() { inputs.Add("zfs", func() telegraf.Input { return &Zfs{ - sysctl: sysctl, - zpool: zpool, - zdataset: zdataset, - uname: uname, + helper: helper{ + sysctl: sysctl, + zpool: zpool, + zdataset: zdataset, + uname: uname, + }, } }) } diff --git a/plugins/inputs/zfs/zfs_linux.go b/plugins/inputs/zfs/zfs_linux.go index 65ab8f914..b6ede87ca 100644 --- a/plugins/inputs/zfs/zfs_linux.go +++ b/plugins/inputs/zfs/zfs_linux.go @@ -14,170 +14,21 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) -type metricsVersion uint8 - const ( unknown metricsVersion = iota v1 v2 ) +type metricsVersion uint8 + type poolInfo struct { name string ioFilename string version metricsVersion } -func probeVersion(kstatPath string) (metricsVersion, []string, error) { - poolsDirs, err := filepath.Glob(kstatPath + "/*/objset-*") - - // From the docs: the only possible returned error is ErrBadPattern, when pattern is malformed. - // Because of this we need to determine how to fallback differently. - if err != nil { - return unknown, poolsDirs, err - } - - if len(poolsDirs) > 0 { - return v2, poolsDirs, nil - } - - // Fallback to the old kstat in case of an older ZFS version. - poolsDirs, err = filepath.Glob(kstatPath + "/*/io") - if err != nil { - return unknown, poolsDirs, err - } - - return v1, poolsDirs, nil -} - -func getPools(kstatPath string) ([]poolInfo, error) { - pools := make([]poolInfo, 0) - version, poolsDirs, err := probeVersion(kstatPath) - if err != nil { - return nil, err - } - - for _, poolDir := range poolsDirs { - poolDirSplit := strings.Split(poolDir, "/") - pool := poolDirSplit[len(poolDirSplit)-2] - pools = append(pools, poolInfo{name: pool, ioFilename: poolDir, version: version}) - } - - return pools, nil -} - -func getTags(pools []poolInfo) map[string]string { - poolNames := "" - knownPools := make(map[string]struct{}) - for _, entry := range pools { - name := entry.name - if _, ok := knownPools[name]; !ok { - knownPools[name] = struct{}{} - if poolNames != "" { - poolNames += "::" - } - poolNames += name - } - } - - return map[string]string{"pools": poolNames} -} - -func gather(lines []string, fileLines int) (keys, values []string, err error) { - if len(lines) < fileLines { - return nil, nil, errors.New("expected lines in kstat does not match") - } - - keys = strings.Fields(lines[1]) - values = strings.Fields(lines[2]) - if len(keys) != len(values) { - return nil, nil, fmt.Errorf("key and value count don't match Keys:%v Values:%v", keys, values) - } - - return keys, values, nil -} - -func gatherV1(lines []string) (map[string]interface{}, error) { - fileLines := 3 - keys, values, err := gather(lines, fileLines) - if err != nil { - return nil, err - } - - fields := make(map[string]interface{}) - for i := 0; i < len(keys); i++ { - value, err := strconv.ParseInt(values[i], 10, 64) - if err != nil { - return nil, err - } - - fields[keys[i]] = value - } - - return fields, nil -} - -// New way of collection. Each objset-* file in ZFS >= 2.1.x has a format looking like this: -// 36 1 0x01 7 2160 5214787391 73405258558961 -// name type data -// dataset_name 7 rpool/ROOT/pve-1 -// writes 4 409570 -// nwritten 4 2063419969 -// reads 4 22108699 -// nread 4 63067280992 -// nunlinks 4 13849 -// nunlinked 4 13848 -// -// For explanation of the first line's values see https://github.com/openzfs/zfs/blob/master/module/os/linux/spl/spl-kstat.c#L61 -func gatherV2(lines []string, tags map[string]string) (map[string]interface{}, error) { - fileLines := 9 - _, _, err := gather(lines, fileLines) - if err != nil { - return nil, err - } - - tags["dataset"] = strings.Fields(lines[2])[2] - fields := make(map[string]interface{}) - for i := 3; i < len(lines); i++ { - lineFields := strings.Fields(lines[i]) - fieldName := lineFields[0] - fieldData := lineFields[2] - value, err := strconv.ParseInt(fieldData, 10, 64) - if err != nil { - return nil, err - } - - fields[fieldName] = value - } - - return fields, nil -} - -func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error { - lines, err := internal.ReadLines(pool.ioFilename) - if err != nil { - return err - } - - var fields map[string]interface{} - var gatherErr error - tags := map[string]string{"pool": pool.name} - switch pool.version { - case v1: - fields, gatherErr = gatherV1(lines) - case v2: - fields, gatherErr = gatherV2(lines, tags) - case unknown: - return errors.New("unknown metrics version detected") - } - - if gatherErr != nil { - return gatherErr - } - - acc.AddFields("zfs_pool", fields, tags) - return nil -} +type helper struct{} //nolint:unused // not used for "linux" OS, needed for Zfs struct func (z *Zfs) Gather(acc telegraf.Accumulator) error { kstatMetrics := z.KstatMetrics @@ -236,6 +87,157 @@ func (z *Zfs) Gather(acc telegraf.Accumulator) error { return nil } +func getPools(kstatPath string) ([]poolInfo, error) { + pools := make([]poolInfo, 0) + version, poolsDirs, err := probeVersion(kstatPath) + if err != nil { + return nil, err + } + + for _, poolDir := range poolsDirs { + poolDirSplit := strings.Split(poolDir, "/") + pool := poolDirSplit[len(poolDirSplit)-2] + pools = append(pools, poolInfo{name: pool, ioFilename: poolDir, version: version}) + } + + return pools, nil +} + +func probeVersion(kstatPath string) (metricsVersion, []string, error) { + poolsDirs, err := filepath.Glob(kstatPath + "/*/objset-*") + + // From the docs: the only possible returned error is ErrBadPattern, when pattern is malformed. + // Because of this we need to determine how to fallback differently. + if err != nil { + return unknown, poolsDirs, err + } + + if len(poolsDirs) > 0 { + return v2, poolsDirs, nil + } + + // Fallback to the old kstat in case of an older ZFS version. + poolsDirs, err = filepath.Glob(kstatPath + "/*/io") + if err != nil { + return unknown, poolsDirs, err + } + + return v1, poolsDirs, nil +} + +func getTags(pools []poolInfo) map[string]string { + poolNames := "" + knownPools := make(map[string]struct{}) + for _, entry := range pools { + name := entry.name + if _, ok := knownPools[name]; !ok { + knownPools[name] = struct{}{} + if poolNames != "" { + poolNames += "::" + } + poolNames += name + } + } + + return map[string]string{"pools": poolNames} +} + +func gatherPoolStats(pool poolInfo, acc telegraf.Accumulator) error { + lines, err := internal.ReadLines(pool.ioFilename) + if err != nil { + return err + } + + var fields map[string]interface{} + var gatherErr error + tags := map[string]string{"pool": pool.name} + switch pool.version { + case v1: + fields, gatherErr = gatherV1(lines) + case v2: + fields, gatherErr = gatherV2(lines, tags) + case unknown: + return errors.New("unknown metrics version detected") + } + + if gatherErr != nil { + return gatherErr + } + + acc.AddFields("zfs_pool", fields, tags) + return nil +} + +func gatherV1(lines []string) (map[string]interface{}, error) { + fileLines := 3 + keys, values, err := gather(lines, fileLines) + if err != nil { + return nil, err + } + + fields := make(map[string]interface{}) + for i := 0; i < len(keys); i++ { + value, err := strconv.ParseInt(values[i], 10, 64) + if err != nil { + return nil, err + } + + fields[keys[i]] = value + } + + return fields, nil +} + +func gather(lines []string, fileLines int) (keys, values []string, err error) { + if len(lines) < fileLines { + return nil, nil, errors.New("expected lines in kstat does not match") + } + + keys = strings.Fields(lines[1]) + values = strings.Fields(lines[2]) + if len(keys) != len(values) { + return nil, nil, fmt.Errorf("key and value count don't match Keys:%v Values:%v", keys, values) + } + + return keys, values, nil +} + +// New way of collection. Each objset-* file in ZFS >= 2.1.x has a format looking like this: +// 36 1 0x01 7 2160 5214787391 73405258558961 +// name type data +// dataset_name 7 rpool/ROOT/pve-1 +// writes 4 409570 +// nwritten 4 2063419969 +// reads 4 22108699 +// nread 4 63067280992 +// nunlinks 4 13849 +// nunlinked 4 13848 +// +// For explanation of the first line's values see https://github.com/openzfs/zfs/blob/master/module/os/linux/spl/spl-kstat.c#L61 +func gatherV2(lines []string, tags map[string]string) (map[string]interface{}, error) { + fileLines := 9 + _, _, err := gather(lines, fileLines) + if err != nil { + return nil, err + } + + tags["dataset"] = strings.Fields(lines[2])[2] + fields := make(map[string]interface{}) + for i := 3; i < len(lines); i++ { + lineFields := strings.Fields(lines[i]) + fieldName := lineFields[0] + fieldData := lineFields[2] + value, err := strconv.ParseInt(fieldData, 10, 64) + if err != nil { + return nil, err + } + + fields[fieldName] = value + } + + return fields, nil +} + func init() { inputs.Add("zfs", func() telegraf.Input { return &Zfs{} diff --git a/plugins/inputs/zfs/zfs_other.go b/plugins/inputs/zfs/zfs_other.go index 6a7cb2f71..44332e9f8 100644 --- a/plugins/inputs/zfs/zfs_other.go +++ b/plugins/inputs/zfs/zfs_other.go @@ -7,7 +7,14 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) -func (*Zfs) Gather(_ telegraf.Accumulator) error { +type helper struct{} //nolint:unused // not used for "other" OSes, needed for Zfs struct + +func (z *Zfs) Init() error { + z.Log.Warn("Current platform is not supported") + return nil +} + +func (*Zfs) Gather(telegraf.Accumulator) error { return nil } diff --git a/plugins/inputs/zipkin/cmd/stress_test_write/stress_test_write.go b/plugins/inputs/zipkin/cmd/stress_test_write/stress_test_write.go index 7757aecf4..861c75240 100644 --- a/plugins/inputs/zipkin/cmd/stress_test_write/stress_test_write.go +++ b/plugins/inputs/zipkin/cmd/stress_test_write/stress_test_write.go @@ -33,29 +33,29 @@ import ( ) var ( - BatchSize int - MaxBackLog int - BatchTimeInterval int - SpanCount int - ZipkinServerHost string + batchSize int + maxBackLog int + batchTimeInterval int + spanCount int + zipkinServerHost string ) func init() { - flag.IntVar(&BatchSize, "batch_size", 10000, "") - flag.IntVar(&MaxBackLog, "max_backlog", 100000, "") - flag.IntVar(&BatchTimeInterval, "batch_interval", 1, "") - flag.IntVar(&SpanCount, "span_count", 100000, "") - flag.StringVar(&ZipkinServerHost, "zipkin_host", "localhost", "") + flag.IntVar(&batchSize, "batch_size", 10000, "") + flag.IntVar(&maxBackLog, "max_backlog", 100000, "") + flag.IntVar(&batchTimeInterval, "batch_interval", 1, "") + flag.IntVar(&spanCount, "span_count", 100000, "") + flag.StringVar(&zipkinServerHost, "zipkin_host", "localhost", "") } func main() { flag.Parse() - var hostname = fmt.Sprintf("http://%s:9411/api/v1/spans", ZipkinServerHost) + var hostname = fmt.Sprintf("http://%s:9411/api/v1/spans", zipkinServerHost) reporter := zipkinhttp.NewReporter( hostname, - zipkinhttp.BatchSize(BatchSize), - zipkinhttp.MaxBacklog(MaxBackLog), - zipkinhttp.BatchInterval(time.Duration(BatchTimeInterval)*time.Second), + zipkinhttp.BatchSize(batchSize), + zipkinhttp.MaxBacklog(maxBackLog), + zipkinhttp.BatchInterval(time.Duration(batchTimeInterval)*time.Second), ) defer reporter.Close() @@ -71,8 +71,8 @@ func main() { tracer := zipkinot.Wrap(nativeTracer) - log.Printf("Writing %d spans to zipkin server at %s\n", SpanCount, hostname) - for i := 0; i < SpanCount; i++ { + log.Printf("Writing %d spans to zipkin server at %s\n", spanCount, hostname) + for i := 0; i < spanCount; i++ { parent := tracer.StartSpan("Parent") parent.LogFields(otlog.Message(fmt.Sprintf("Trace%d", i))) parent.Finish() diff --git a/plugins/inputs/zipkin/codec/codec_test.go b/plugins/inputs/zipkin/codec/codec_test.go index b3476aa4d..f68500556 100644 --- a/plugins/inputs/zipkin/codec/codec_test.go +++ b/plugins/inputs/zipkin/codec/codec_test.go @@ -2,11 +2,10 @@ package codec import ( "errors" - "reflect" "testing" "time" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" ) @@ -35,9 +34,7 @@ func Test_MicroToTime(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := MicroToTime(tt.micro); !reflect.DeepEqual(got, tt.want) { - t.Errorf("microToTime() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, MicroToTime(tt.micro)) }) } } @@ -110,13 +107,10 @@ func Test_minMax(t *testing.T) { if tt.now != nil { now = tt.now } - got, got1 := minMax(tt.span) - if !reflect.DeepEqual(got, tt.wantMin) { - t.Errorf("minMax() got = %v, want %v", got, tt.wantMin) - } - if !reflect.DeepEqual(got1, tt.wantMax) { - t.Errorf("minMax() got1 = %v, want %v", got1, tt.wantMax) - } + gotMin, gotMax := minMax(tt.span) + require.Equal(t, tt.wantMin, gotMin) + require.Equal(t, tt.wantMax, gotMax) + now = time.Now }) } @@ -179,9 +173,7 @@ func Test_guessTimestamp(t *testing.T) { if tt.now != nil { now = tt.now } - if got := guessTimestamp(tt.span); !reflect.DeepEqual(got, tt.want) { - t.Errorf("guessTimestamp() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, guessTimestamp(tt.span)) now = time.Now }) } @@ -220,9 +212,7 @@ func Test_convertDuration(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := convertDuration(tt.span); got != tt.want { - t.Errorf("convertDuration() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, convertDuration(tt.span)) }) } } @@ -259,13 +249,12 @@ func Test_parentID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := parentID(tt.span) - if (err != nil) != tt.wantErr { - t.Errorf("parentID() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("parentID() = %v, want %v", got, tt.want) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } @@ -346,9 +335,7 @@ func Test_serviceEndpoint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := serviceEndpoint(tt.ann, tt.bann); !reflect.DeepEqual(got, tt.want) { - t.Errorf("serviceEndpoint() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, serviceEndpoint(tt.ann, tt.bann)) }) } } @@ -388,9 +375,7 @@ func TestNewBinaryAnnotations(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewBinaryAnnotations(tt.annotations, tt.endpoint); !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewBinaryAnnotations() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, NewBinaryAnnotations(tt.annotations, tt.endpoint)) }) } } @@ -430,9 +415,7 @@ func TestNewAnnotations(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewAnnotations(tt.annotations, tt.endpoint); !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewAnnotations() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, NewAnnotations(tt.annotations, tt.endpoint)) }) } } @@ -524,13 +507,12 @@ func TestNewTrace(t *testing.T) { now = tt.now } got, err := NewTrace(tt.spans) - if (err != nil) != tt.wantErr { - t.Errorf("NewTrace() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !cmp.Equal(tt.want, got) { - t.Errorf("NewTrace() = %s", cmp.Diff(tt.want, got)) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) now = time.Now }) } diff --git a/plugins/inputs/zipkin/codec/jsonV1/jsonV1.go b/plugins/inputs/zipkin/codec/jsonV1/jsonV1.go index d35318568..c3c843c36 100644 --- a/plugins/inputs/zipkin/codec/jsonV1/jsonV1.go +++ b/plugins/inputs/zipkin/codec/jsonV1/jsonV1.go @@ -24,7 +24,7 @@ func (*JSON) Decode(octets []byte) ([]codec.Span, error) { res := make([]codec.Span, 0, len(spans)) for i := range spans { - if err := spans[i].Validate(); err != nil { + if err := spans[i].validate(); err != nil { return nil, err } res = append(res, &spans[i]) @@ -44,7 +44,7 @@ type span struct { BAnno []binaryAnnotation `json:"binaryAnnotations"` } -func (s *span) Validate() error { +func (s *span) validate() error { var err error check := func(f func() (string, error)) { if err != nil { @@ -68,21 +68,21 @@ func (s *span) Trace() (string, error) { if s.TraceID == "" { return "", errors.New("trace ID cannot be null") } - return TraceIDFromString(s.TraceID) + return traceIDFromString(s.TraceID) } func (s *span) SpanID() (string, error) { if s.ID == "" { return "", errors.New("span ID cannot be null") } - return IDFromString(s.ID) + return idFromString(s.ID) } func (s *span) Parent() (string, error) { if s.ParentID == "" { return "", nil } - return IDFromString(s.ParentID) + return idFromString(s.ParentID) } func (s *span) Name() string { @@ -215,8 +215,8 @@ func (e *endpoint) Name() string { return e.ServiceName } -// TraceIDFromString creates a TraceID from a hexadecimal string -func TraceIDFromString(s string) (string, error) { +// traceIDFromString creates a TraceID from a hexadecimal string +func traceIDFromString(s string) (string, error) { var hi, lo uint64 var err error if len(s) > 32 { @@ -240,8 +240,8 @@ func TraceIDFromString(s string) (string, error) { return fmt.Sprintf("%x%016x", hi, lo), nil } -// IDFromString validates the ID and returns it in hexadecimal format. -func IDFromString(s string) (string, error) { +// idFromString validates the ID and returns it in hexadecimal format. +func idFromString(s string) (string, error) { if len(s) > 16 { return "", fmt.Errorf("length of ID cannot be greater than 16 hex characters: %s", s) } diff --git a/plugins/inputs/zipkin/codec/jsonV1/jsonV1_test.go b/plugins/inputs/zipkin/codec/jsonV1/jsonV1_test.go index 36ceba33a..5b9ac2022 100644 --- a/plugins/inputs/zipkin/codec/jsonV1/jsonV1_test.go +++ b/plugins/inputs/zipkin/codec/jsonV1/jsonV1_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec" ) @@ -452,13 +452,12 @@ func TestJSON_Decode(t *testing.T) { t.Run(tt.name, func(t *testing.T) { j := &JSON{} got, err := j.Decode(tt.octets) - if (err != nil) != tt.wantErr { - t.Errorf("JSON.Decode() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !cmp.Equal(tt.want, got) { - t.Errorf("JSON.Decode() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } @@ -502,13 +501,12 @@ func Test_span_Trace(t *testing.T) { TraceID: tt.TraceID, } got, err := s.Trace() - if (err != nil) != tt.wantErr { - t.Errorf("span.Trace() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !cmp.Equal(tt.want, got) { - t.Errorf("span.Trace() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } @@ -552,13 +550,12 @@ func Test_span_SpanID(t *testing.T) { ID: tt.ID, } got, err := s.SpanID() - if (err != nil) != tt.wantErr { - t.Errorf("span.SpanID() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !cmp.Equal(tt.want, got) { - t.Errorf("span.SpanID() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } @@ -597,13 +594,12 @@ func Test_span_Parent(t *testing.T) { ParentID: tt.ParentID, } got, err := s.Parent() - if (err != nil) != tt.wantErr { - t.Errorf("span.Parent() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !cmp.Equal(tt.want, got) { - t.Errorf("span.Parent() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } @@ -630,9 +626,7 @@ func Test_span_Timestamp(t *testing.T) { s := &span{ Time: tt.Time, } - if got := s.Timestamp(); !cmp.Equal(tt.want, got) { - t.Errorf("span.Timestamp() = got(-)/want(+) %s", cmp.Diff(tt.want, got)) - } + require.Equal(t, tt.want, s.Timestamp()) }) } } @@ -659,9 +653,7 @@ func Test_span_Duration(t *testing.T) { s := &span{ Dur: tt.dur, } - if got := s.Duration(); got != tt.want { - t.Errorf("span.Duration() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, s.Duration()) }) } } @@ -703,15 +695,9 @@ func Test_annotation(t *testing.T) { t.Run(tt.name, func(t *testing.T) { an := annotation(tt.fields) a := &an - if got := a.Timestamp(); !got.Equal(tt.tm) { - t.Errorf("annotation.Timestamp() = %v, want %v", got, tt.tm) - } - if got := a.Value(); got != tt.val { - t.Errorf("annotation.Value() = %v, want %v", got, tt.val) - } - if got := a.Host(); !cmp.Equal(tt.endpoint, got) { - t.Errorf("annotation.Endpoint() = %v, want %v", got, tt.endpoint) - } + require.Equal(t, tt.tm, a.Timestamp()) + require.Equal(t, tt.val, a.Value()) + require.Equal(t, tt.endpoint, a.Host()) }) } } @@ -754,15 +740,9 @@ func Test_binaryAnnotation(t *testing.T) { t.Run(tt.name, func(t *testing.T) { bin := binaryAnnotation(tt.fields) b := &bin - if got := b.Key(); got != tt.key { - t.Errorf("binaryAnnotation.Key() = %v, want %v", got, tt.key) - } - if got := b.Value(); got != tt.value { - t.Errorf("binaryAnnotation.Value() = %v, want %v", got, tt.value) - } - if got := b.Host(); !cmp.Equal(tt.endpoint, got) { - t.Errorf("binaryAnnotation.Endpoint() = %v, want %v", got, tt.endpoint) - } + require.Equal(t, tt.key, b.Key()) + require.Equal(t, tt.value, b.Value()) + require.Equal(t, tt.endpoint, b.Host()) }) } } @@ -799,9 +779,7 @@ func Test_endpoint_Host(t *testing.T) { Ipv4: tt.fields.Ipv4, Port: tt.fields.Port, } - if got := e.Host(); got != tt.want { - t.Errorf("endpoint.Host() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, e.Host()) }) } } @@ -823,9 +801,7 @@ func Test_endpoint_Name(t *testing.T) { e := &endpoint{ ServiceName: tt.ServiceName, } - if got := e.Name(); got != tt.want { - t.Errorf("endpoint.Name() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, e.Name()) }) } } @@ -870,14 +846,13 @@ func TestTraceIDFromString(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := TraceIDFromString(tt.s) - if (err != nil) != tt.wantErr { - t.Errorf("TraceIDFromString() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("TraceIDFromString() = %v, want %v", got, tt.want) + got, err := traceIDFromString(tt.s) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } @@ -907,14 +882,13 @@ func TestIDFromString(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := IDFromString(tt.s) - if (err != nil) != tt.wantErr { - t.Errorf("IDFromString() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("IDFromString() = %v, want %v", got, tt.want) + got, err := idFromString(tt.s) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } diff --git a/plugins/inputs/zipkin/codec/thrift/thrift.go b/plugins/inputs/zipkin/codec/thrift/thrift.go index 2cc226d43..8dbb94e4e 100644 --- a/plugins/inputs/zipkin/codec/thrift/thrift.go +++ b/plugins/inputs/zipkin/codec/thrift/thrift.go @@ -15,8 +15,25 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift/gen-go/zipkincore" ) -// UnmarshalThrift converts raw bytes in thrift format to a slice of spans -func UnmarshalThrift(body []byte) ([]*zipkincore.Span, error) { +// Thrift decodes binary data to create a Trace +type Thrift struct{} + +// Decode unmarshals and validates bytes in thrift format +func (*Thrift) Decode(octets []byte) ([]codec.Span, error) { + spans, err := unmarshalThrift(octets) + if err != nil { + return nil, err + } + + res := make([]codec.Span, 0, len(spans)) + for _, s := range spans { + res = append(res, &span{s}) + } + return res, nil +} + +// unmarshalThrift converts raw bytes in thrift format to a slice of spans +func unmarshalThrift(body []byte) ([]*zipkincore.Span, error) { buffer := thrift.NewTMemoryBuffer() buffer.Write(body) @@ -41,23 +58,6 @@ func UnmarshalThrift(body []byte) ([]*zipkincore.Span, error) { return spans, nil } -// Thrift decodes binary data to create a Trace -type Thrift struct{} - -// Decode unmarshals and validates bytes in thrift format -func (*Thrift) Decode(octets []byte) ([]codec.Span, error) { - spans, err := UnmarshalThrift(octets) - if err != nil { - return nil, err - } - - res := make([]codec.Span, 0, len(spans)) - for _, s := range spans { - res = append(res, &span{s}) - } - return res, nil -} - var _ codec.Endpoint = &endpoint{} type endpoint struct { diff --git a/plugins/inputs/zipkin/codec/thrift/thrift_test.go b/plugins/inputs/zipkin/codec/thrift/thrift_test.go index 642142605..82d93f675 100644 --- a/plugins/inputs/zipkin/codec/thrift/thrift_test.go +++ b/plugins/inputs/zipkin/codec/thrift/thrift_test.go @@ -4,7 +4,7 @@ import ( "os" "testing" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift/gen-go/zipkincore" ) @@ -49,9 +49,7 @@ func Test_endpointHost(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { e := endpoint{tt.args.h} - if got := e.Host(); got != tt.want { - t.Errorf("host() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, e.Host()) }) } } @@ -85,9 +83,7 @@ func Test_endpointName(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { e := endpoint{tt.args.h} - if got := e.Name(); got != tt.want { - t.Errorf("serviceName() = %v, want %v", got, tt.want) - } + require.Equal(t, tt.want, e.Name()) }) } } @@ -198,14 +194,13 @@ func TestUnmarshalThrift(t *testing.T) { t.Fatalf("Could not find file %s\n", tt.filename) } - got, err := UnmarshalThrift(dat) - if (err != nil) != tt.wantErr { - t.Errorf("UnmarshalThrift() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !cmp.Equal(tt.want, got) { - t.Errorf("UnmarshalThrift() got(-)/want(+): %s", cmp.Diff(tt.want, got)) + got, err := unmarshalThrift(dat) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tt.want, got) }) } } diff --git a/plugins/inputs/zipkin/convert.go b/plugins/inputs/zipkin/convert.go index ae087645f..e4d6e1feb 100644 --- a/plugins/inputs/zipkin/convert.go +++ b/plugins/inputs/zipkin/convert.go @@ -7,25 +7,22 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" ) -// LineProtocolConverter implements the Recorder interface; it is a -// type meant to encapsulate the storage of zipkin tracing data in -// telegraf as line protocol. -type LineProtocolConverter struct { +// lineProtocolConverter implements the recorder interface; +// it is a type meant to encapsulate the storage of zipkin tracing data in telegraf as line protocol. +type lineProtocolConverter struct { acc telegraf.Accumulator } -// NewLineProtocolConverter returns an instance of LineProtocolConverter that -// will add to the given telegraf.Accumulator -func NewLineProtocolConverter(acc telegraf.Accumulator) *LineProtocolConverter { - return &LineProtocolConverter{ +// newLineProtocolConverter returns an instance of lineProtocolConverter that will add to the given telegraf.Accumulator +func newLineProtocolConverter(acc telegraf.Accumulator) *lineProtocolConverter { + return &lineProtocolConverter{ acc: acc, } } -// Record is LineProtocolConverter's implementation of the Record method of -// the Recorder interface; it takes a trace as input, and adds it to an internal -// telegraf.Accumulator. -func (l *LineProtocolConverter) Record(t trace.Trace) error { +// record is lineProtocolConverter's implementation of the record method of the recorder interface; +// it takes a trace as input, and adds it to an internal telegraf.Accumulator. +func (l *lineProtocolConverter) record(t trace.Trace) error { for _, s := range t { fields := map[string]interface{}{ "duration_ns": s.Duration.Nanoseconds(), @@ -71,12 +68,11 @@ func (l *LineProtocolConverter) Record(t trace.Trace) error { return nil } -func (l *LineProtocolConverter) Error(err error) { +func (l *lineProtocolConverter) error(err error) { l.acc.AddError(err) } -// formatName formats name and service name -// Zipkin forces span and service names to be lowercase: +// formatName formats name and service name Zipkin forces span and service names to be lowercase: // https://github.com/openzipkin/zipkin/pull/805 func formatName(name string) string { return strings.ToLower(name) diff --git a/plugins/inputs/zipkin/convert_test.go b/plugins/inputs/zipkin/convert_test.go index 4dc0d2baf..69aa2f2dc 100644 --- a/plugins/inputs/zipkin/convert_test.go +++ b/plugins/inputs/zipkin/convert_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" @@ -325,22 +325,24 @@ func TestLineProtocolConverter_Record(t *testing.T) { }, }, } - for i, tt := range tests { + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockAcc.ClearMetrics() - l := &LineProtocolConverter{ + l := &lineProtocolConverter{ acc: tt.fields.acc, } - if err := l.Record(tt.args.t); (err != nil) != tt.wantErr { - t.Errorf("LineProtocolConverter.Record() error = %v, wantErr %v", err, tt.wantErr) + err := l.record(tt.args.t) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) } + got := make([]testutil.Metric, 0, len(mockAcc.Metrics)) for _, metric := range mockAcc.Metrics { got = append(got, *metric) } - if !cmp.Equal(got, tt.want) { - t.Errorf("LineProtocolConverter.Record()/%s/%d error = %s ", tt.name, i, cmp.Diff(got, tt.want)) - } + require.Equal(t, tt.want, got) }) } } diff --git a/plugins/inputs/zipkin/handler.go b/plugins/inputs/zipkin/handler.go index 1094b2588..affd425a1 100644 --- a/plugins/inputs/zipkin/handler.go +++ b/plugins/inputs/zipkin/handler.go @@ -15,17 +15,15 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/zipkin/codec/thrift" ) -// SpanHandler is an implementation of a Handler which accepts zipkin thrift -// span data and sends it to the recorder -type SpanHandler struct { - Path string - recorder Recorder +// spanHandler is an implementation of a handler which accepts zipkin thrift span data and sends it to the recorder +type spanHandler struct { + path string + recorder recorder } -// NewSpanHandler returns a new server instance given path to handle -func NewSpanHandler(path string) *SpanHandler { - return &SpanHandler{ - Path: path, +func newSpanHandler(path string) *spanHandler { + return &spanHandler{ + path: path, } } @@ -58,17 +56,17 @@ func cors(next http.HandlerFunc) http.HandlerFunc { } } -// Register implements the Service interface. Register accepts zipkin thrift data +// register implements the Service interface. Register accepts zipkin thrift data // POSTed to the path of the mux router -func (s *SpanHandler) Register(router *mux.Router, recorder Recorder) error { - handler := cors(http.HandlerFunc(s.Spans)) - router.Handle(s.Path, handler).Methods("POST", "OPTIONS") +func (s *spanHandler) register(router *mux.Router, recorder recorder) error { + handler := cors(http.HandlerFunc(s.spans)) + router.Handle(s.path, handler).Methods("POST", "OPTIONS") s.recorder = recorder return nil } -// Spans handles zipkin thrift spans -func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { +// spans handles zipkin thrift spans +func (s *spanHandler) spans(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() body := r.Body var err error @@ -76,42 +74,42 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Content-Encoding") == "gzip" { body, err = gzip.NewReader(r.Body) if err != nil { - s.recorder.Error(err) + s.recorder.error(err) w.WriteHeader(http.StatusInternalServerError) return } defer body.Close() } - decoder, err := ContentDecoder(r) + decoder, err := contentDecoder(r) if err != nil { - s.recorder.Error(err) + s.recorder.error(err) w.WriteHeader(http.StatusUnsupportedMediaType) } octets, err := io.ReadAll(body) if err != nil { - s.recorder.Error(err) + s.recorder.error(err) w.WriteHeader(http.StatusInternalServerError) return } spans, err := decoder.Decode(octets) if err != nil { - s.recorder.Error(err) + s.recorder.error(err) w.WriteHeader(http.StatusBadRequest) return } trace, err := codec.NewTrace(spans) if err != nil { - s.recorder.Error(err) + s.recorder.error(err) w.WriteHeader(http.StatusBadRequest) return } - if err = s.recorder.Record(trace); err != nil { - s.recorder.Error(err) + if err = s.recorder.record(trace); err != nil { + s.recorder.error(err) w.WriteHeader(http.StatusInternalServerError) return } @@ -119,10 +117,10 @@ func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -// ContentDecoder returns a Decoder that is able to produce Traces from bytes. +// contentDecoder returns a Decoder that is able to produce Traces from bytes. // Failure should yield an HTTP 415 (`http.StatusUnsupportedMediaType`) // If a Content-Type is not set, zipkin assumes application/json -func ContentDecoder(r *http.Request) (codec.Decoder, error) { +func contentDecoder(r *http.Request) (codec.Decoder, error) { contentType := r.Header.Get("Content-Type") if contentType == "" { return &json_v1.JSON{}, nil diff --git a/plugins/inputs/zipkin/handler_test.go b/plugins/inputs/zipkin/handler_test.go index 455fa7896..2f4dc98dd 100644 --- a/plugins/inputs/zipkin/handler_test.go +++ b/plugins/inputs/zipkin/handler_test.go @@ -10,23 +10,23 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/inputs/zipkin/trace" ) -type MockRecorder struct { - Data trace.Trace - Err error +type mockRecorder struct { + data trace.Trace + err error } -func (m *MockRecorder) Record(t trace.Trace) error { - m.Data = t +func (m *mockRecorder) record(t trace.Trace) error { + m.data = t return nil } -func (m *MockRecorder) Error(err error) { - m.Err = err +func (m *mockRecorder) error(err error) { + m.err = err } func TestSpanHandler(t *testing.T) { @@ -43,16 +43,14 @@ func TestSpanHandler(t *testing.T) { bytes.NewReader(dat))) r.Header.Set("Content-Type", "application/x-thrift") - handler := NewSpanHandler("/api/v1/spans") - mockRecorder := &MockRecorder{} + handler := newSpanHandler("/api/v1/spans") + mockRecorder := &mockRecorder{} handler.recorder = mockRecorder - handler.Spans(w, r) - if w.Code != http.StatusNoContent { - t.Errorf("MainHandler did not return StatusNoContent %d", w.Code) - } + handler.spans(w, r) + require.Equal(t, http.StatusNoContent, w.Code) - got := mockRecorder.Data + got := mockRecorder.data parentID := strconv.FormatInt(22964302721410078, 16) want := trace.Trace{ @@ -131,7 +129,5 @@ func TestSpanHandler(t *testing.T) { }, } - if !cmp.Equal(got, want) { - t.Fatalf("Got != Want\n %s", cmp.Diff(got, want)) - } + require.Equal(t, want, got) } diff --git a/plugins/inputs/zipkin/zipkin.go b/plugins/inputs/zipkin/zipkin.go index 3efe8f7fd..c630f1e57 100644 --- a/plugins/inputs/zipkin/zipkin.go +++ b/plugins/inputs/zipkin/zipkin.go @@ -22,6 +22,11 @@ import ( //go:embed sample.conf var sampleConfig string +var ( + // defaultNetwork is the network to listen on; use only in tests. + defaultNetwork = "tcp" +) + const ( // defaultPort is the default port zipkin listens on, which zipkin implementations expect. defaultPort = 9411 @@ -36,51 +41,35 @@ const ( defaultWriteTimeout = 10 * time.Second ) -var ( - // defaultNetwork is the network to listen on; use only in tests. - defaultNetwork = "tcp" -) - -// Recorder represents a type which can record zipkin trace data as well as -// any accompanying errors, and process that data. -type Recorder interface { - Record(trace.Trace) error - Error(error) -} - -// Handler represents a type which can register itself with a router for -// http routing, and a Recorder for trace data collection. -type Handler interface { - Register(router *mux.Router, recorder Recorder) error -} - -// Zipkin is a telegraf configuration structure for the zipkin input plugin, -// but it also contains fields for the management of a separate, concurrent -// zipkin http server type Zipkin struct { Port int `toml:"port"` Path string `toml:"path"` ReadTimeout config.Duration `toml:"read_timeout"` WriteTimeout config.Duration `toml:"write_timeout"` - Log telegraf.Logger + Log telegraf.Logger `toml:"-"` address string - handler Handler + handler handler server *http.Server waitGroup *sync.WaitGroup } +// recorder represents a type which can record zipkin trace data as well as any accompanying errors, and process that data. +type recorder interface { + record(trace.Trace) error + error(error) +} + +// handler represents a type which can register itself with a router for http routing, and a recorder for trace data collection. +type handler interface { + register(router *mux.Router, recorder recorder) error +} + func (*Zipkin) SampleConfig() string { return sampleConfig } -// Gather is empty for the zipkin plugin; all gathering is done through -// the separate goroutine launched in (*Zipkin).Start() -func (*Zipkin) Gather(telegraf.Accumulator) error { return nil } - -// Start launches a separate goroutine for collecting zipkin client http requests, -// passing in a telegraf.Accumulator such that data can be collected. func (z *Zipkin) Start(acc telegraf.Accumulator) error { if z.ReadTimeout < config.Duration(time.Second) { z.ReadTimeout = config.Duration(defaultReadTimeout) @@ -89,14 +78,14 @@ func (z *Zipkin) Start(acc telegraf.Accumulator) error { z.WriteTimeout = config.Duration(defaultWriteTimeout) } - z.handler = NewSpanHandler(z.Path) + z.handler = newSpanHandler(z.Path) var wg sync.WaitGroup z.waitGroup = &wg router := mux.NewRouter() - converter := NewLineProtocolConverter(acc) - if err := z.handler.Register(router, converter); err != nil { + converter := newLineProtocolConverter(acc) + if err := z.handler.register(router, converter); err != nil { return err } @@ -119,13 +108,14 @@ func (z *Zipkin) Start(acc telegraf.Accumulator) error { go func() { defer wg.Done() - z.Listen(ln, acc) + z.listen(ln, acc) }() return nil } -// Stop shuts the internal http server down with via context.Context +func (*Zipkin) Gather(telegraf.Accumulator) error { return nil } + func (z *Zipkin) Stop() { ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) @@ -135,9 +125,9 @@ func (z *Zipkin) Stop() { z.server.Shutdown(ctx) //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway } -// Listen creates a http server on the zipkin instance it is called with, and +// listen creates a http server on the zipkin instance it is called with, and // serves http until it is stopped by Zipkin's (*Zipkin).Stop() method. -func (z *Zipkin) Listen(ln net.Listener, acc telegraf.Accumulator) { +func (z *Zipkin) listen(ln net.Listener, acc telegraf.Accumulator) { if err := z.server.Serve(ln); err != nil { // Because of the clean shutdown in `(*Zipkin).Stop()` // We're expecting a server closed error at some point diff --git a/plugins/inputs/zipkin/zipkin_test.go b/plugins/inputs/zipkin/zipkin_test.go index 9eb5f34cc..0dbd741f1 100644 --- a/plugins/inputs/zipkin/zipkin_test.go +++ b/plugins/inputs/zipkin/zipkin_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" @@ -626,9 +626,8 @@ func TestZipkinPlugin(t *testing.T) { for _, m := range mockAcc.Metrics { got = append(got, *m) } - if !cmp.Equal(tt.want, got) { - t.Fatalf("Got != Want\n %s", cmp.Diff(tt.want, got)) - } + + require.Equal(t, tt.want, got) }) } mockAcc.ClearMetrics() diff --git a/plugins/inputs/zookeeper/zookeeper.go b/plugins/inputs/zookeeper/zookeeper.go index 6f48a012a..02d2005a8 100644 --- a/plugins/inputs/zookeeper/zookeeper.go +++ b/plugins/inputs/zookeeper/zookeeper.go @@ -24,7 +24,6 @@ var sampleConfig string var zookeeperFormatRE = regexp.MustCompile(`^zk_(\w[\w\.\-]*)\s+([\w\.\-]+)`) -// Zookeeper is a zookeeper plugin type Zookeeper struct { Servers []string `toml:"servers"` Timeout config.Duration `toml:"timeout"` @@ -38,25 +37,10 @@ type Zookeeper struct { tlsConfig *tls.Config } -var defaultTimeout = 5 * time.Second - -func (z *Zookeeper) dial(ctx context.Context, addr string) (net.Conn, error) { - var dialer net.Dialer - if z.EnableTLS || z.EnableSSL { - deadline, ok := ctx.Deadline() - if ok { - dialer.Deadline = deadline - } - return tls.DialWithDialer(&dialer, "tcp", addr, z.tlsConfig) - } - return dialer.DialContext(ctx, "tcp", addr) -} - func (*Zookeeper) SampleConfig() string { return sampleConfig } -// Gather reads stats from all configured servers accumulates stats func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { ctx := context.Background() @@ -70,7 +54,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { } if z.Timeout < config.Duration(1*time.Second) { - z.Timeout = config.Duration(defaultTimeout) + z.Timeout = config.Duration(5 * time.Second) } ctx, cancel := context.WithTimeout(ctx, time.Duration(z.Timeout)) @@ -170,6 +154,18 @@ func (z *Zookeeper) gatherServer(ctx context.Context, address string, acc telegr return nil } +func (z *Zookeeper) dial(ctx context.Context, addr string) (net.Conn, error) { + var dialer net.Dialer + if z.EnableTLS || z.EnableSSL { + deadline, ok := ctx.Deadline() + if ok { + dialer.Deadline = deadline + } + return tls.DialWithDialer(&dialer, "tcp", addr, z.tlsConfig) + } + return dialer.DialContext(ctx, "tcp", addr) +} + func init() { inputs.Add("zookeeper", func() telegraf.Input { return &Zookeeper{}