chore: Fix linter findings for `revive:exported` in `plugins/inputs/[a-b]*` (#15913)

This commit is contained in:
Paweł Żak 2024-10-09 16:38:22 +02:00 committed by GitHub
parent 438653591b
commit f4f7a63860
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 585 additions and 598 deletions

View File

@ -87,22 +87,6 @@ type stats struct {
DequeueCounter int `xml:"dequeueCounter,attr"`
}
func (a *ActiveMQ) createHTTPClient() (*http.Client, error) {
tlsCfg, err := a.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: time.Duration(a.ResponseTimeout),
}
return client, nil
}
func (*ActiveMQ) SampleConfig() string {
return sampleConfig
}
@ -138,7 +122,61 @@ func (a *ActiveMQ) Init() error {
return nil
}
func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) {
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
dataQueues, err := a.getMetrics(a.queuesURL())
if err != nil {
return err
}
queues := queues{}
err = xml.Unmarshal(dataQueues, &queues)
if err != nil {
return fmt.Errorf("queues XML unmarshal error: %w", err)
}
dataTopics, err := a.getMetrics(a.topicsURL())
if err != nil {
return err
}
topics := topics{}
err = xml.Unmarshal(dataTopics, &topics)
if err != nil {
return fmt.Errorf("topics XML unmarshal error: %w", err)
}
dataSubscribers, err := a.getMetrics(a.subscribersURL())
if err != nil {
return err
}
subscribers := subscribers{}
err = xml.Unmarshal(dataSubscribers, &subscribers)
if err != nil {
return fmt.Errorf("subscribers XML unmarshal error: %w", err)
}
a.gatherQueuesMetrics(acc, queues)
a.gatherTopicsMetrics(acc, topics)
a.gatherSubscribersMetrics(acc, subscribers)
return nil
}
func (a *ActiveMQ) createHTTPClient() (*http.Client, error) {
tlsCfg, err := a.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: time.Duration(a.ResponseTimeout),
}
return client, nil
}
func (a *ActiveMQ) getMetrics(u string) ([]byte, error) {
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return nil, err
@ -161,7 +199,7 @@ func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) {
return io.ReadAll(resp.Body)
}
func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues queues) {
func (a *ActiveMQ) gatherQueuesMetrics(acc telegraf.Accumulator, queues queues) {
for _, queue := range queues.QueueItems {
records := make(map[string]interface{})
tags := make(map[string]string)
@ -179,7 +217,7 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues queues)
}
}
func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics topics) {
func (a *ActiveMQ) gatherTopicsMetrics(acc telegraf.Accumulator, topics topics) {
for _, topic := range topics.TopicItems {
records := make(map[string]interface{})
tags := make(map[string]string)
@ -197,7 +235,7 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics topics)
}
}
func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers subscribers) {
func (a *ActiveMQ) gatherSubscribersMetrics(acc telegraf.Accumulator, subscribers subscribers) {
for _, subscriber := range subscribers.SubscriberItems {
records := make(map[string]interface{})
tags := make(map[string]string)
@ -221,55 +259,17 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber
}
}
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
dataQueues, err := a.GetMetrics(a.QueuesURL())
if err != nil {
return err
}
queues := queues{}
err = xml.Unmarshal(dataQueues, &queues)
if err != nil {
return fmt.Errorf("queues XML unmarshal error: %w", err)
}
dataTopics, err := a.GetMetrics(a.TopicsURL())
if err != nil {
return err
}
topics := topics{}
err = xml.Unmarshal(dataTopics, &topics)
if err != nil {
return fmt.Errorf("topics XML unmarshal error: %w", err)
}
dataSubscribers, err := a.GetMetrics(a.SubscribersURL())
if err != nil {
return err
}
subscribers := subscribers{}
err = xml.Unmarshal(dataSubscribers, &subscribers)
if err != nil {
return fmt.Errorf("subscribers XML unmarshal error: %w", err)
}
a.GatherQueuesMetrics(acc, queues)
a.GatherTopicsMetrics(acc, topics)
a.GatherSubscribersMetrics(acc, subscribers)
return nil
}
func (a *ActiveMQ) QueuesURL() string {
func (a *ActiveMQ) queuesURL() string {
ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/queues.jsp")}
return a.baseURL.ResolveReference(&ref).String()
}
func (a *ActiveMQ) TopicsURL() string {
func (a *ActiveMQ) topicsURL() string {
ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/topics.jsp")}
return a.baseURL.ResolveReference(&ref).String()
}
func (a *ActiveMQ) SubscribersURL() string {
func (a *ActiveMQ) subscribersURL() string {
ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/subscribers.jsp")}
return a.baseURL.ResolveReference(&ref).String()
}

View File

@ -52,7 +52,7 @@ func TestGatherQueuesMetrics(t *testing.T) {
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
plugin.GatherQueuesMetrics(&acc, queues)
plugin.gatherQueuesMetrics(&acc, queues)
acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
}
@ -98,7 +98,7 @@ func TestGatherTopicsMetrics(t *testing.T) {
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
plugin.GatherTopicsMetrics(&acc, topics)
plugin.gatherTopicsMetrics(&acc, topics)
acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
}
@ -137,7 +137,7 @@ func TestGatherSubscribersMetrics(t *testing.T) {
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
plugin.GatherSubscribersMetrics(&acc, subscribers)
plugin.gatherSubscribersMetrics(&acc, subscribers)
acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
}

View File

@ -27,7 +27,6 @@ import (
var sampleConfig string
type (
// AliyunCMS is aliyun cms config info.
AliyunCMS struct {
AccessKeyID string `toml:"access_key_id"`
AccessKeySecret string `toml:"access_key_secret"`
@ -43,7 +42,7 @@ type (
Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"`
Project string `toml:"project"`
Metrics []*Metric `toml:"metrics"`
Metrics []*metric `toml:"metrics"`
RateLimit int `toml:"ratelimit"`
Log telegraf.Logger `toml:"-"`
@ -57,8 +56,8 @@ type (
measurement string
}
// Metric describes what metrics to get
Metric struct {
// metric describes what metrics to get
metric struct {
ObjectsFilter string `toml:"objects_filter"`
MetricNames []string `toml:"names"`
Dimensions string `toml:"dimensions"` // String representation of JSON dimensions
@ -74,11 +73,6 @@ type (
}
// Dimension describe how to get metrics
Dimension struct {
Value string `toml:"value"`
}
aliyuncmsClient interface {
DescribeMetricList(request *cms.DescribeMetricListRequest) (response *cms.DescribeMetricListResponse, err error)
}
@ -113,7 +107,6 @@ func (*AliyunCMS) SampleConfig() string {
return sampleConfig
}
// Init perform checks of plugin inputs and initialize internals
func (s *AliyunCMS) Init() error {
if s.Project == "" {
return errors.New("project is not set")
@ -216,7 +209,6 @@ func (s *AliyunCMS) Start(telegraf.Accumulator) error {
return nil
}
// Gather implements telegraf.Inputs interface
func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error {
s.updateWindow(time.Now())
@ -225,16 +217,16 @@ func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error {
defer lmtr.Stop()
var wg sync.WaitGroup
for _, metric := range s.Metrics {
for _, m := range s.Metrics {
// Prepare internal structure with data from discovery
s.prepareTagsAndDimensions(metric)
wg.Add(len(metric.MetricNames))
for _, metricName := range metric.MetricNames {
s.prepareTagsAndDimensions(m)
wg.Add(len(m.MetricNames))
for _, metricName := range m.MetricNames {
<-lmtr.C
go func(metricName string, metric *Metric) {
go func(metricName string, m *metric) {
defer wg.Done()
acc.AddError(s.gatherMetric(acc, metricName, metric))
}(metricName, metric)
acc.AddError(s.gatherMetric(acc, metricName, m))
}(metricName, m)
}
wg.Wait()
}
@ -269,7 +261,7 @@ func (s *AliyunCMS) updateWindow(relativeTo time.Time) {
}
// Gather given metric and emit error
func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *Metric) error {
func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *metric) error {
for _, region := range s.Regions {
req := cms.CreateDescribeMetricListRequest()
req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10)
@ -372,7 +364,7 @@ func parseTag(tagSpec string, data interface{}) (tagKey, tagValue string, err er
return tagKey, tagValue, nil
}
func (s *AliyunCMS) prepareTagsAndDimensions(metric *Metric) {
func (s *AliyunCMS) prepareTagsAndDimensions(metric *metric) {
var (
newData bool
defaultTags = []string{"RegionId:RegionId"}

View File

@ -240,7 +240,7 @@ func TestPluginMetricsInitialize(t *testing.T) {
expectedErrorString string
regions []string
discoveryRegions []string
metrics []*Metric
metrics []*metric
}{
{
name: "Valid project",
@ -248,7 +248,7 @@ func TestPluginMetricsInitialize(t *testing.T) {
regions: []string{"cn-shanghai"},
accessKeyID: "dummy",
accessKeySecret: "dummy",
metrics: []*Metric{
metrics: []*metric{
{
MetricNames: []string{},
Dimensions: `{"instanceId": "i-abcdefgh123456"}`,
@ -261,7 +261,7 @@ func TestPluginMetricsInitialize(t *testing.T) {
regions: []string{"cn-shanghai"},
accessKeyID: "dummy",
accessKeySecret: "dummy",
metrics: []*Metric{
metrics: []*metric{
{
MetricNames: []string{},
Dimensions: `[{"instanceId": "p-example"},{"instanceId": "q-example"}]`,
@ -275,7 +275,7 @@ func TestPluginMetricsInitialize(t *testing.T) {
accessKeyID: "dummy",
accessKeySecret: "dummy",
expectedErrorString: `cannot parse dimensions (neither obj, nor array) "[": unexpected end of JSON input`,
metrics: []*Metric{
metrics: []*metric{
{
MetricNames: []string{},
Dimensions: `[`,
@ -343,7 +343,7 @@ func TestGatherMetric(t *testing.T) {
Regions: []string{"cn-shanghai"},
}
metric := &Metric{
metric := &metric{
MetricNames: []string{},
Dimensions: `"instanceId": "i-abcdefgh123456"`,
}
@ -374,7 +374,7 @@ func TestGatherMetric(t *testing.T) {
}
func TestGather(t *testing.T) {
metric := &Metric{
m := &metric{
MetricNames: []string{},
Dimensions: `{"instanceId": "i-abcdefgh123456"}`,
}
@ -382,7 +382,7 @@ func TestGather(t *testing.T) {
AccessKeyID: "my_access_key_id",
AccessKeySecret: "my_access_key_secret",
Project: "acs_slb_dashboard",
Metrics: []*Metric{metric},
Metrics: []*metric{m},
RateLimit: 200,
measurement: formatMeasurement("acs_slb_dashboard"),
Regions: []string{"cn-shanghai"},

View File

@ -395,8 +395,7 @@ func (dt *discoveryTool) getDiscoveryDataAcrossRegions(lmtr chan bool) (map[stri
return resultData, nil
}
// start the discovery pooling
// In case smth. new found it will be reported back through `DataChan`
// start the discovery pooling; in case something new is found, it will be reported back through `dataChan`
func (dt *discoveryTool) start() {
var (
err error
@ -443,8 +442,7 @@ func (dt *discoveryTool) start() {
}()
}
// stop the discovery loop, making sure
// all data is read from 'dataChan'
// stop the discovery loop, making sure all data is read from 'dataChan'
func (dt *discoveryTool) stop() {
close(dt.done)

View File

@ -28,215 +28,7 @@ type ROCmSMI struct {
Log telegraf.Logger `toml:"-"`
}
func (*ROCmSMI) SampleConfig() string {
return sampleConfig
}
// Gather implements the telegraf interface
func (rsmi *ROCmSMI) Gather(acc telegraf.Accumulator) error {
data, err := rsmi.pollROCmSMI()
if err != nil {
return fmt.Errorf("failed to execute command in pollROCmSMI: %w", err)
}
return gatherROCmSMI(data, acc)
}
func (rsmi *ROCmSMI) Start(telegraf.Accumulator) error {
if _, err := os.Stat(rsmi.BinPath); os.IsNotExist(err) {
binPath, err := exec.LookPath("rocm-smi")
if err != nil {
return &internal.StartupError{Err: err}
}
rsmi.BinPath = binPath
}
return nil
}
func (rsmi *ROCmSMI) Stop() {}
func init() {
inputs.Add("amd_rocm_smi", func() telegraf.Input {
return &ROCmSMI{
BinPath: "/opt/rocm/bin/rocm-smi",
Timeout: config.Duration(5 * time.Second),
}
})
}
func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) {
// Construct and execute metrics query, there currently exist (ROCm v4.3.x) a "-a" option
// that does not provide all the information, so each needed parameter is set manually
cmd := exec.Command(rsmi.BinPath,
"-o",
"-l",
"-m",
"-M",
"-g",
"-c",
"-t",
"-u",
"-i",
"-f",
"-p",
"-P",
"-s",
"-S",
"-v",
"--showreplaycount",
"--showpids",
"--showdriverversion",
"--showmemvendor",
"--showfwinfo",
"--showproductname",
"--showserial",
"--showuniqueid",
"--showbus",
"--showpendingpages",
"--showpagesinfo",
"--showmeminfo",
"all",
"--showretiredpages",
"--showunreservablepages",
"--showmemuse",
"--showvoltage",
"--showtopo",
"--showtopoweight",
"--showtopohops",
"--showtopotype",
"--showtoponuma",
"--json")
return internal.StdOutputTimeout(cmd, time.Duration(rsmi.Timeout))
}
func gatherROCmSMI(ret []byte, acc telegraf.Accumulator) error {
var gpus map[string]GPU
var sys map[string]sysInfo
err1 := json.Unmarshal(ret, &gpus)
if err1 != nil {
return err1
}
err2 := json.Unmarshal(ret, &sys)
if err2 != nil {
return err2
}
metrics := genTagsFields(gpus, sys)
for _, metric := range metrics {
acc.AddFields(measurement, metric.fields, metric.tags)
}
return nil
}
type metric struct {
tags map[string]string
fields map[string]interface{}
}
func genTagsFields(gpus map[string]GPU, system map[string]sysInfo) []metric {
metrics := []metric{}
for cardID := range gpus {
if strings.Contains(cardID, "card") {
tags := map[string]string{
"name": cardID,
}
fields := map[string]interface{}{}
payload := gpus[cardID]
//nolint:errcheck // silently treat as zero if malformed
totVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalMemory, 10, 64)
//nolint:errcheck // silently treat as zero if malformed
usdVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalUsedMemory, 10, 64)
strFree := strconv.FormatInt(totVRAM-usdVRAM, 10)
// Try using value found in Device ID first. If not found, try GPU
// ID for backwards compatibility.
setTagIfUsed(tags, "gpu_id", payload.DeviceID)
setTagIfUsed(tags, "gpu_id", payload.GpuID)
setTagIfUsed(tags, "gpu_unique_id", payload.GpuUniqueID)
setIfUsed("int", fields, "driver_version", strings.ReplaceAll(system["system"].DriverVersion, ".", ""))
setIfUsed("int", fields, "fan_speed", payload.GpuFanSpeedPercentage)
setIfUsed("int64", fields, "memory_total", payload.GpuVRAMTotalMemory)
setIfUsed("int64", fields, "memory_used", payload.GpuVRAMTotalUsedMemory)
setIfUsed("int64", fields, "memory_free", strFree)
setIfUsed("float", fields, "temperature_sensor_edge", payload.GpuTemperatureSensorEdge)
setIfUsed("float", fields, "temperature_sensor_junction", payload.GpuTemperatureSensorJunction)
setIfUsed("float", fields, "temperature_sensor_memory", payload.GpuTemperatureSensorMemory)
setIfUsed("int", fields, "utilization_gpu", payload.GpuUsePercentage)
// Try using allocated percentage first.
setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryAllocatedPercentage)
setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryUsePercentage)
setIfUsed("int", fields, "clocks_current_sm", strings.Trim(payload.GpuSclkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_memory", strings.Trim(payload.GpuMclkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_display", strings.Trim(payload.GpuDcefClkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_fabric", strings.Trim(payload.GpuFclkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_system", strings.Trim(payload.GpuSocclkClockSpeed, "(Mhz)"))
setIfUsed("float", fields, "power_draw", payload.GpuAveragePower)
setIfUsed("str", fields, "card_series", payload.GpuCardSeries)
setIfUsed("str", fields, "card_model", payload.GpuCardModel)
setIfUsed("str", fields, "card_vendor", payload.GpuCardVendor)
metrics = append(metrics, metric{tags, fields})
}
}
return metrics
}
func setTagIfUsed(m map[string]string, k, v string) {
if v != "" {
m[k] = v
}
}
func setIfUsed(t string, m map[string]interface{}, k, v string) {
vals := strings.Fields(v)
if len(vals) < 1 {
return
}
val := vals[0]
switch t {
case "float":
if val != "" {
f, err := strconv.ParseFloat(val, 64)
if err == nil {
m[k] = f
}
}
case "int":
if val != "" {
i, err := strconv.Atoi(val)
if err == nil {
m[k] = i
}
}
case "int64":
if val != "" {
i, err := strconv.ParseInt(val, 10, 64)
if err == nil {
m[k] = i
}
}
case "str":
if val != "" {
m[k] = val
}
}
}
type sysInfo struct {
DriverVersion string `json:"Driver version"`
}
type GPU struct {
type gpu struct {
DeviceID string `json:"Device ID"`
GpuID string `json:"GPU ID"`
GpuUniqueID string `json:"Unique ID"`
@ -304,3 +96,210 @@ type GPU struct {
GpuGTTTotalMemory string `json:"GTT Total Memory (B)"`
GpuGTTTotalUsedMemory string `json:"GTT Total Used Memory (B)"`
}
type sysInfo struct {
DriverVersion string `json:"Driver version"`
}
type metric struct {
tags map[string]string
fields map[string]interface{}
}
func (*ROCmSMI) SampleConfig() string {
return sampleConfig
}
func (rsmi *ROCmSMI) Start(telegraf.Accumulator) error {
if _, err := os.Stat(rsmi.BinPath); os.IsNotExist(err) {
binPath, err := exec.LookPath("rocm-smi")
if err != nil {
return &internal.StartupError{Err: err}
}
rsmi.BinPath = binPath
}
return nil
}
func (rsmi *ROCmSMI) Gather(acc telegraf.Accumulator) error {
data, err := rsmi.pollROCmSMI()
if err != nil {
return fmt.Errorf("failed to execute command in pollROCmSMI: %w", err)
}
return gatherROCmSMI(data, acc)
}
func (rsmi *ROCmSMI) Stop() {}
func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) {
// Construct and execute metrics query, there currently exist (ROCm v4.3.x) a "-a" option
// that does not provide all the information, so each needed parameter is set manually
cmd := exec.Command(rsmi.BinPath,
"-o",
"-l",
"-m",
"-M",
"-g",
"-c",
"-t",
"-u",
"-i",
"-f",
"-p",
"-P",
"-s",
"-S",
"-v",
"--showreplaycount",
"--showpids",
"--showdriverversion",
"--showmemvendor",
"--showfwinfo",
"--showproductname",
"--showserial",
"--showuniqueid",
"--showbus",
"--showpendingpages",
"--showpagesinfo",
"--showmeminfo",
"all",
"--showretiredpages",
"--showunreservablepages",
"--showmemuse",
"--showvoltage",
"--showtopo",
"--showtopoweight",
"--showtopohops",
"--showtopotype",
"--showtoponuma",
"--json")
return internal.StdOutputTimeout(cmd, time.Duration(rsmi.Timeout))
}
func genTagsFields(gpus map[string]gpu, system map[string]sysInfo) []metric {
metrics := []metric{}
for cardID := range gpus {
if strings.Contains(cardID, "card") {
tags := map[string]string{
"name": cardID,
}
fields := map[string]interface{}{}
payload := gpus[cardID]
//nolint:errcheck // silently treat as zero if malformed
totVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalMemory, 10, 64)
//nolint:errcheck // silently treat as zero if malformed
usdVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalUsedMemory, 10, 64)
strFree := strconv.FormatInt(totVRAM-usdVRAM, 10)
// Try using value found in Device ID first. If not found, try GPU
// ID for backwards compatibility.
setTagIfUsed(tags, "gpu_id", payload.DeviceID)
setTagIfUsed(tags, "gpu_id", payload.GpuID)
setTagIfUsed(tags, "gpu_unique_id", payload.GpuUniqueID)
setIfUsed("int", fields, "driver_version", strings.ReplaceAll(system["system"].DriverVersion, ".", ""))
setIfUsed("int", fields, "fan_speed", payload.GpuFanSpeedPercentage)
setIfUsed("int64", fields, "memory_total", payload.GpuVRAMTotalMemory)
setIfUsed("int64", fields, "memory_used", payload.GpuVRAMTotalUsedMemory)
setIfUsed("int64", fields, "memory_free", strFree)
setIfUsed("float", fields, "temperature_sensor_edge", payload.GpuTemperatureSensorEdge)
setIfUsed("float", fields, "temperature_sensor_junction", payload.GpuTemperatureSensorJunction)
setIfUsed("float", fields, "temperature_sensor_memory", payload.GpuTemperatureSensorMemory)
setIfUsed("int", fields, "utilization_gpu", payload.GpuUsePercentage)
// Try using allocated percentage first.
setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryAllocatedPercentage)
setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryUsePercentage)
setIfUsed("int", fields, "clocks_current_sm", strings.Trim(payload.GpuSclkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_memory", strings.Trim(payload.GpuMclkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_display", strings.Trim(payload.GpuDcefClkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_fabric", strings.Trim(payload.GpuFclkClockSpeed, "(Mhz)"))
setIfUsed("int", fields, "clocks_current_system", strings.Trim(payload.GpuSocclkClockSpeed, "(Mhz)"))
setIfUsed("float", fields, "power_draw", payload.GpuAveragePower)
setIfUsed("str", fields, "card_series", payload.GpuCardSeries)
setIfUsed("str", fields, "card_model", payload.GpuCardModel)
setIfUsed("str", fields, "card_vendor", payload.GpuCardVendor)
metrics = append(metrics, metric{tags, fields})
}
}
return metrics
}
func gatherROCmSMI(ret []byte, acc telegraf.Accumulator) error {
var gpus map[string]gpu
var sys map[string]sysInfo
err1 := json.Unmarshal(ret, &gpus)
if err1 != nil {
return err1
}
err2 := json.Unmarshal(ret, &sys)
if err2 != nil {
return err2
}
metrics := genTagsFields(gpus, sys)
for _, metric := range metrics {
acc.AddFields(measurement, metric.fields, metric.tags)
}
return nil
}
func setTagIfUsed(m map[string]string, k, v string) {
if v != "" {
m[k] = v
}
}
func setIfUsed(t string, m map[string]interface{}, k, v string) {
vals := strings.Fields(v)
if len(vals) < 1 {
return
}
val := vals[0]
switch t {
case "float":
if val != "" {
f, err := strconv.ParseFloat(val, 64)
if err == nil {
m[k] = f
}
}
case "int":
if val != "" {
i, err := strconv.Atoi(val)
if err == nil {
m[k] = i
}
}
case "int64":
if val != "" {
i, err := strconv.ParseInt(val, 10, 64)
if err == nil {
m[k] = i
}
}
case "str":
if val != "" {
m[k] = val
}
}
}
func init() {
inputs.Add("amd_rocm_smi", func() telegraf.Input {
return &ROCmSMI{
BinPath: "/opt/rocm/bin/rocm-smi",
Timeout: config.Duration(5 * time.Second),
}
})
}

View File

@ -26,9 +26,10 @@ var sampleConfig string
var once sync.Once
type empty struct{}
type externalAuth struct{}
type semaphore chan empty
// AMQPConsumer is the top level struct for this plugin
type AMQPConsumer struct {
URL string `toml:"url" deprecated:"1.7.0;1.35.0;use 'brokers' instead"`
Brokers []string `toml:"brokers"`
@ -62,11 +63,10 @@ type AMQPConsumer struct {
decoder internal.ContentDecoder
}
type externalAuth struct{}
func (a *externalAuth) Mechanism() string {
return "EXTERNAL"
}
func (a *externalAuth) Response() string {
return "\000"
}
@ -115,51 +115,6 @@ func (a *AMQPConsumer) SetParser(parser telegraf.Parser) {
a.parser = parser
}
// All gathering is done in the Start function
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
// make new tls config
tlsCfg, err := a.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
var auth []amqp.Authentication
if strings.EqualFold(a.AuthMethod, "EXTERNAL") {
auth = []amqp.Authentication{&externalAuth{}}
} else if !a.Username.Empty() || !a.Password.Empty() {
username, err := a.Username.Get()
if err != nil {
return nil, fmt.Errorf("getting username failed: %w", err)
}
defer username.Destroy()
password, err := a.Password.Get()
if err != nil {
return nil, fmt.Errorf("getting password failed: %w", err)
}
defer password.Destroy()
auth = []amqp.Authentication{
&amqp.PlainAuth{
Username: username.String(),
Password: password.String(),
},
}
}
amqpConfig := amqp.Config{
TLSClientConfig: tlsCfg,
SASL: auth, // if nil, it will be PLAIN
Dial: amqp.DefaultDial(time.Duration(a.Timeout)),
}
return &amqpConfig, nil
}
// Start satisfies the telegraf.ServiceInput interface
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
amqpConf, err := a.createConfig()
if err != nil {
@ -219,6 +174,63 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error {
return nil
}
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (a *AMQPConsumer) Stop() {
// We did not connect successfully so there is nothing to do here.
if a.conn == nil || a.conn.IsClosed() {
return
}
a.cancel()
a.wg.Wait()
err := a.conn.Close()
if err != nil && !errors.Is(err, amqp.ErrClosed) {
a.Log.Errorf("Error closing AMQP connection: %s", err)
return
}
}
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) {
// make new tls config
tlsCfg, err := a.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
var auth []amqp.Authentication
if strings.EqualFold(a.AuthMethod, "EXTERNAL") {
auth = []amqp.Authentication{&externalAuth{}}
} else if !a.Username.Empty() || !a.Password.Empty() {
username, err := a.Username.Get()
if err != nil {
return nil, fmt.Errorf("getting username failed: %w", err)
}
defer username.Destroy()
password, err := a.Password.Get()
if err != nil {
return nil, fmt.Errorf("getting password failed: %w", err)
}
defer password.Destroy()
auth = []amqp.Authentication{
&amqp.PlainAuth{
Username: username.String(),
Password: password.String(),
},
}
}
amqpConfig := amqp.Config{
TLSClientConfig: tlsCfg,
SASL: auth, // if nil, it will be PLAIN
Dial: amqp.DefaultDial(time.Duration(a.Timeout)),
}
return &amqpConfig, nil
}
func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) {
brokers := a.Brokers
@ -477,20 +489,6 @@ func (a *AMQPConsumer) onDelivery(track telegraf.DeliveryInfo) bool {
return true
}
func (a *AMQPConsumer) Stop() {
// We did not connect successfully so there is nothing to do here.
if a.conn == nil || a.conn.IsClosed() {
return
}
a.cancel()
a.wg.Wait()
err := a.conn.Close()
if err != nil && !errors.Is(err, amqp.ErrClosed) {
a.Log.Errorf("Error closing AMQP connection: %s", err)
return
}
}
func init() {
inputs.Add("amqp_consumer", func() telegraf.Input {
return &AMQPConsumer{Timeout: config.Duration(30 * time.Second)}

View File

@ -21,19 +21,19 @@ import (
//go:embed sample.conf
var sampleConfig string
type RoleType int
type roleType int
const (
Unknown RoleType = iota
Leader
Follower
unknown roleType = iota
leader
follower
)
func (r RoleType) String() string {
func (r roleType) String() string {
switch r {
case Leader:
case leader:
return "leader"
case Follower:
case follower:
return "follower"
default:
return "unknown"
@ -45,7 +45,7 @@ var (
defaultRoles = []string{"leader", "follower"}
)
type Vars map[string]interface{}
type vars map[string]interface{}
type Aurora struct {
Schedulers []string `toml:"schedulers"`
@ -136,7 +136,7 @@ func (a *Aurora) initialize() error {
return nil
}
func (a *Aurora) roleEnabled(role RoleType) bool {
func (a *Aurora) roleEnabled(role roleType) bool {
if len(a.Roles) == 0 {
return true
}
@ -149,12 +149,12 @@ func (a *Aurora) roleEnabled(role RoleType) bool {
return false
}
func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, error) {
func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (roleType, error) {
loc := *origin
loc.Path = "leaderhealth"
req, err := http.NewRequest("GET", loc.String(), nil)
if err != nil {
return Unknown, err
return unknown, err
}
if a.Username != "" || a.Password != "" {
@ -164,26 +164,26 @@ func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, err
resp, err := a.client.Do(req.WithContext(ctx))
if err != nil {
return Unknown, err
return unknown, err
}
if err := resp.Body.Close(); err != nil {
return Unknown, fmt.Errorf("closing body failed: %w", err)
return unknown, fmt.Errorf("closing body failed: %w", err)
}
switch resp.StatusCode {
case http.StatusOK:
return Leader, nil
return leader, nil
case http.StatusBadGateway:
fallthrough
case http.StatusServiceUnavailable:
return Follower, nil
return follower, nil
default:
return Unknown, fmt.Errorf("%v", resp.Status)
return unknown, fmt.Errorf("%v", resp.Status)
}
}
func (a *Aurora) gatherScheduler(
ctx context.Context, origin *url.URL, role RoleType, acc telegraf.Accumulator,
ctx context.Context, origin *url.URL, role roleType, acc telegraf.Accumulator,
) error {
loc := *origin
loc.Path = "vars.json"
@ -207,16 +207,16 @@ func (a *Aurora) gatherScheduler(
return fmt.Errorf("%v", resp.Status)
}
var vars Vars
var metrics vars
decoder := json.NewDecoder(resp.Body)
decoder.UseNumber()
err = decoder.Decode(&vars)
err = decoder.Decode(&metrics)
if err != nil {
return fmt.Errorf("decoding response: %w", err)
}
var fields = make(map[string]interface{}, len(vars))
for k, v := range vars {
var fields = make(map[string]interface{}, len(metrics))
for k, v := range metrics {
switch v := v.(type) {
case json.Number:
// Aurora encodes numbers as you would specify them as a literal,

View File

@ -12,8 +12,8 @@ import (
)
type (
TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
CheckFunc func(t *testing.T, err error, acc *testutil.Accumulator)
testHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
checkFunc func(t *testing.T, err error, acc *testutil.Accumulator)
)
func TestAurora(t *testing.T) {
@ -28,9 +28,9 @@ func TestAurora(t *testing.T) {
plugin *Aurora
schedulers []string
roles []string
leaderhealth TestHandlerFunc
varsjson TestHandlerFunc
check CheckFunc
leaderhealth testHandlerFunc
varsjson testHandlerFunc
check checkFunc
}{
{
name: "minimal",

View File

@ -21,9 +21,9 @@ type AzureMonitor struct {
ClientSecret string `toml:"client_secret"`
TenantID string `toml:"tenant_id"`
CloudOption string `toml:"cloud_option,omitempty"`
ResourceTargets []*ResourceTarget `toml:"resource_target"`
ResourceGroupTargets []*ResourceGroupTarget `toml:"resource_group_target"`
SubscriptionTargets []*Resource `toml:"subscription_target"`
ResourceTargets []*resourceTarget `toml:"resource_target"`
ResourceGroupTargets []*resourceGroupTarget `toml:"resource_group_target"`
SubscriptionTargets []*resource `toml:"subscription_target"`
Log telegraf.Logger `toml:"-"`
receiver *receiver.AzureMonitorMetricsReceiver
@ -31,18 +31,18 @@ type AzureMonitor struct {
azureClients *receiver.AzureClients
}
type ResourceTarget struct {
type resourceTarget struct {
ResourceID string `toml:"resource_id"`
Metrics []string `toml:"metrics"`
Aggregations []string `toml:"aggregations"`
}
type ResourceGroupTarget struct {
type resourceGroupTarget struct {
ResourceGroup string `toml:"resource_group"`
Resources []*Resource `toml:"resource"`
Resources []*resource `toml:"resource"`
}
type Resource struct {
type resource struct {
ResourceType string `toml:"resource_type"`
Metrics []string `toml:"metrics"`
Aggregations []string `toml:"aggregations"`
@ -62,7 +62,6 @@ func (am *AzureMonitor) SampleConfig() string {
return sampleConfig
}
// Init is for setup, and validating config.
func (am *AzureMonitor) Init() error {
var clientOptions azcore.ClientOptions
switch am.CloudOption {

View File

@ -42,45 +42,8 @@ func (a *AzureStorageQueue) Init() error {
return nil
}
func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) {
if a.serviceURL == nil {
_url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net")
if err != nil {
return azqueue.ServiceURL{}, err
}
credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
if err != nil {
return azqueue.ServiceURL{}, err
}
pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{})
serviceURL := azqueue.NewServiceURL(*_url, pipeline)
a.serviceURL = &serviceURL
}
return *a.serviceURL, nil
}
func (a *AzureStorageQueue) GatherQueueMetrics(
acc telegraf.Accumulator,
queueItem azqueue.QueueItem,
properties *azqueue.QueueGetPropertiesResponse,
peekedMessage *azqueue.PeekedMessage,
) {
fields := make(map[string]interface{})
tags := make(map[string]string)
tags["queue"] = strings.TrimSpace(queueItem.Name)
tags["account"] = a.StorageAccountName
fields["size"] = properties.ApproximateMessagesCount()
if peekedMessage != nil {
fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano()
}
acc.AddFields("azure_storage_queues", fields, tags)
}
func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error {
serviceURL, err := a.GetServiceURL()
serviceURL, err := a.getServiceURL()
if err != nil {
return err
}
@ -117,12 +80,49 @@ func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error {
}
}
a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage)
a.gatherQueueMetrics(acc, queueItem, properties, peekedMessage)
}
}
return nil
}
func (a *AzureStorageQueue) getServiceURL() (azqueue.ServiceURL, error) {
if a.serviceURL == nil {
_url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net")
if err != nil {
return azqueue.ServiceURL{}, err
}
credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
if err != nil {
return azqueue.ServiceURL{}, err
}
pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{})
serviceURL := azqueue.NewServiceURL(*_url, pipeline)
a.serviceURL = &serviceURL
}
return *a.serviceURL, nil
}
func (a *AzureStorageQueue) gatherQueueMetrics(
acc telegraf.Accumulator,
queueItem azqueue.QueueItem,
properties *azqueue.QueueGetPropertiesResponse,
peekedMessage *azqueue.PeekedMessage,
) {
fields := make(map[string]interface{})
tags := make(map[string]string)
tags["queue"] = strings.TrimSpace(queueItem.Name)
tags["account"] = a.StorageAccountName
fields["size"] = properties.ApproximateMessagesCount()
if peekedMessage != nil {
fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano()
}
acc.AddFields("azure_storage_queues", fields, tags)
}
func init() {
inputs.Add("azure_storage_queue", func() telegraf.Input {
return &AzureStorageQueue{PeekOldestMessageAge: true}

View File

@ -20,8 +20,44 @@ import (
var sampleConfig string
type Bcache struct {
BcachePath string
BcacheDevs []string
BcachePath string `toml:"bcachePath"`
BcacheDevs []string `toml:"bcacheDevs"`
}
func (*Bcache) SampleConfig() string {
return sampleConfig
}
func (b *Bcache) Gather(acc telegraf.Accumulator) error {
bcacheDevsChecked := make(map[string]bool)
var restrictDevs bool
if len(b.BcacheDevs) != 0 {
restrictDevs = true
for _, bcacheDev := range b.BcacheDevs {
bcacheDevsChecked[bcacheDev] = true
}
}
bcachePath := b.BcachePath
if len(bcachePath) == 0 {
bcachePath = "/sys/fs/bcache"
}
bdevs, err := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 || err != nil {
return errors.New("can't find any bcache device")
}
for _, bdev := range bdevs {
if restrictDevs {
bcacheDev := getTags(bdev)["bcache_dev"]
if !bcacheDevsChecked[bcacheDev] {
continue
}
}
if err := b.gatherBcache(bdev, acc); err != nil {
return fmt.Errorf("gathering bcache failed: %w", err)
}
}
return nil
}
func getTags(bdev string) map[string]string {
@ -102,42 +138,6 @@ func (b *Bcache) gatherBcache(bdev string, acc telegraf.Accumulator) error {
return nil
}
func (*Bcache) SampleConfig() string {
return sampleConfig
}
func (b *Bcache) Gather(acc telegraf.Accumulator) error {
bcacheDevsChecked := make(map[string]bool)
var restrictDevs bool
if len(b.BcacheDevs) != 0 {
restrictDevs = true
for _, bcacheDev := range b.BcacheDevs {
bcacheDevsChecked[bcacheDev] = true
}
}
bcachePath := b.BcachePath
if len(bcachePath) == 0 {
bcachePath = "/sys/fs/bcache"
}
bdevs, err := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 || err != nil {
return errors.New("can't find any bcache device")
}
for _, bdev := range bdevs {
if restrictDevs {
bcacheDev := getTags(bdev)["bcache_dev"]
if !bcacheDevsChecked[bcacheDev] {
continue
}
}
if err := b.gatherBcache(bdev, acc); err != nil {
return fmt.Errorf("gathering bcache failed: %w", err)
}
}
return nil
}
func init() {
inputs.Add("bcache", func() telegraf.Input {
return &Bcache{}

View File

@ -20,23 +20,10 @@ import (
//go:embed sample.conf
var sampleConfig string
const suffixInfo = "/"
const suffixStats = "/stats"
type Info struct {
Beat string `json:"beat"`
Hostname string `json:"hostname"`
Name string `json:"name"`
UUID string `json:"uuid"`
Version string `json:"version"`
}
type Stats struct {
Beat map[string]interface{} `json:"beat"`
FileBeat interface{} `json:"filebeat"`
Libbeat interface{} `json:"libbeat"`
System interface{} `json:"system"`
}
const (
suffixInfo = "/"
suffixStats = "/stats"
)
type Beat struct {
URL string `toml:"url"`
@ -54,14 +41,19 @@ type Beat struct {
client *http.Client
}
func NewBeat() *Beat {
return &Beat{
URL: "http://127.0.0.1:5066",
Includes: []string{"beat", "libbeat", "filebeat"},
Method: "GET",
Headers: make(map[string]string),
Timeout: config.Duration(time.Second * 5),
}
type info struct {
Beat string `json:"beat"`
Hostname string `json:"hostname"`
Name string `json:"name"`
UUID string `json:"uuid"`
Version string `json:"version"`
}
type stats struct {
Beat map[string]interface{} `json:"beat"`
FileBeat interface{} `json:"filebeat"`
LibBeat interface{} `json:"libbeat"`
System interface{} `json:"system"`
}
func (*Beat) SampleConfig() string {
@ -86,6 +78,67 @@ func (beat *Beat) Init() error {
return nil
}
func (beat *Beat) Gather(accumulator telegraf.Accumulator) error {
beatStats := &stats{}
beatInfo := &info{}
infoURL, err := url.Parse(beat.URL + suffixInfo)
if err != nil {
return err
}
statsURL, err := url.Parse(beat.URL + suffixStats)
if err != nil {
return err
}
err = beat.gatherJSONData(infoURL.String(), beatInfo)
if err != nil {
return err
}
tags := map[string]string{
"beat_beat": beatInfo.Beat,
"beat_id": beatInfo.UUID,
"beat_name": beatInfo.Name,
"beat_host": beatInfo.Hostname,
"beat_version": beatInfo.Version,
}
err = beat.gatherJSONData(statsURL.String(), beatStats)
if err != nil {
return err
}
for _, name := range beat.Includes {
var stats interface{}
var metric string
switch name {
case "beat":
stats = beatStats.Beat
metric = "beat"
case "filebeat":
stats = beatStats.FileBeat
metric = "beat_filebeat"
case "system":
stats = beatStats.System
metric = "beat_system"
case "libbeat":
stats = beatStats.LibBeat
metric = "beat_libbeat"
default:
return fmt.Errorf("unknown stats-type %q", name)
}
flattener := parsers_json.JSONFlattener{}
err := flattener.FullFlattenJSON("", stats, true, true)
if err != nil {
return err
}
accumulator.AddFields(metric, flattener.Fields, tags)
}
return nil
}
// createHTTPClient create a clients to access API
func (beat *Beat) createHTTPClient() (*http.Client, error) {
tlsConfig, err := beat.ClientConfig.TLSConfig()
@ -130,69 +183,18 @@ func (beat *Beat) gatherJSONData(address string, value interface{}) error {
return json.NewDecoder(response.Body).Decode(value)
}
func (beat *Beat) Gather(accumulator telegraf.Accumulator) error {
beatStats := &Stats{}
beatInfo := &Info{}
infoURL, err := url.Parse(beat.URL + suffixInfo)
if err != nil {
return err
func newBeat() *Beat {
return &Beat{
URL: "http://127.0.0.1:5066",
Includes: []string{"beat", "libbeat", "filebeat"},
Method: "GET",
Headers: make(map[string]string),
Timeout: config.Duration(time.Second * 5),
}
statsURL, err := url.Parse(beat.URL + suffixStats)
if err != nil {
return err
}
err = beat.gatherJSONData(infoURL.String(), beatInfo)
if err != nil {
return err
}
tags := map[string]string{
"beat_beat": beatInfo.Beat,
"beat_id": beatInfo.UUID,
"beat_name": beatInfo.Name,
"beat_host": beatInfo.Hostname,
"beat_version": beatInfo.Version,
}
err = beat.gatherJSONData(statsURL.String(), beatStats)
if err != nil {
return err
}
for _, name := range beat.Includes {
var stats interface{}
var metric string
switch name {
case "beat":
stats = beatStats.Beat
metric = "beat"
case "filebeat":
stats = beatStats.FileBeat
metric = "beat_filebeat"
case "system":
stats = beatStats.System
metric = "beat_system"
case "libbeat":
stats = beatStats.Libbeat
metric = "beat_libbeat"
default:
return fmt.Errorf("unknown stats-type %q", name)
}
flattener := parsers_json.JSONFlattener{}
err := flattener.FullFlattenJSON("", stats, true, true)
if err != nil {
return err
}
accumulator.AddFields(metric, flattener.Fields, tags)
}
return nil
}
func init() {
inputs.Add("beat", func() telegraf.Input {
return NewBeat()
return newBeat()
})
}

View File

@ -16,7 +16,7 @@ import (
func Test_BeatStats(t *testing.T) {
var beat6StatsAccumulator testutil.Accumulator
var beatTest = NewBeat()
var beatTest = newBeat()
// System stats are disabled by default
beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"}
require.NoError(t, beatTest.Init())
@ -160,7 +160,7 @@ func Test_BeatStats(t *testing.T) {
func Test_BeatRequest(t *testing.T) {
var beat6StatsAccumulator testutil.Accumulator
beatTest := NewBeat()
beatTest := newBeat()
// System stats are disabled by default
beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"}
require.NoError(t, beatTest.Init())

View File

@ -17,13 +17,12 @@ import (
//go:embed sample.conf
var sampleConfig string
// default host proc path
const defaultHostProc = "/proc"
const defaultHostSys = "/sys"
// env host proc variable name
const envProc = "HOST_PROC"
const envSys = "HOST_SYS"
const (
defaultHostProc = "/proc"
defaultHostSys = "/sys"
envProc = "HOST_PROC"
envSys = "HOST_SYS"
)
type Bond struct {
HostProc string `toml:"host_proc"`

View File

@ -31,7 +31,7 @@ const (
)
type (
burrow struct {
Burrow struct {
tls.ClientConfig
Servers []string
@ -91,17 +91,11 @@ type (
}
)
func init() {
inputs.Add("burrow", func() telegraf.Input {
return &burrow{}
})
}
func (*burrow) SampleConfig() string {
func (*Burrow) SampleConfig() string {
return sampleConfig
}
func (b *burrow) Gather(acc telegraf.Accumulator) error {
func (b *Burrow) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
if len(b.Servers) == 0 {
@ -141,7 +135,7 @@ func (b *burrow) Gather(acc telegraf.Accumulator) error {
return nil
}
func (b *burrow) setDefaults() {
func (b *Burrow) setDefaults() {
if b.APIPrefix == "" {
b.APIPrefix = defaultBurrowPrefix
}
@ -153,7 +147,7 @@ func (b *burrow) setDefaults() {
}
}
func (b *burrow) compileGlobs() error {
func (b *Burrow) compileGlobs() error {
var err error
// compile glob patterns
@ -172,7 +166,7 @@ func (b *burrow) compileGlobs() error {
return nil
}
func (b *burrow) createClient() (*http.Client, error) {
func (b *Burrow) createClient() (*http.Client, error) {
tlsCfg, err := b.ClientConfig.TLSConfig()
if err != nil {
return nil, err
@ -199,7 +193,7 @@ func (b *burrow) createClient() (*http.Client, error) {
return client, nil
}
func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) {
func (b *Burrow) getResponse(u *url.URL) (*apiResponse, error) {
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
@ -224,7 +218,7 @@ func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) {
return ares, dec.Decode(ares)
}
func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {
func (b *Burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {
var wg sync.WaitGroup
r, err := b.getResponse(src)
@ -263,7 +257,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error {
return nil
}
func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
func (b *Burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
var wg sync.WaitGroup
r, err := b.getResponse(src)
@ -302,7 +296,7 @@ func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string,
wg.Wait()
}
func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) {
func (b *Burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) {
for i, offset := range r.Offsets {
tags := map[string]string{
"cluster": cluster,
@ -320,7 +314,7 @@ func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc tele
}
}
func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
func (b *Burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) {
var wg sync.WaitGroup
r, err := b.getResponse(src)
@ -360,7 +354,7 @@ func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string,
wg.Wait()
}
func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
func (b *Burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
partitionCount := r.Status.PartitionCount
if partitionCount == 0 {
partitionCount = len(r.Status.Partitions)
@ -399,7 +393,7 @@ func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, ac
)
}
func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
func (b *Burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) {
for _, partition := range r.Status.Partitions {
if !b.filterTopics.Match(partition.Topic) {
continue
@ -455,3 +449,9 @@ func mapStatusToCode(src string) int {
return 0
}
}
func init() {
inputs.Add("burrow", func() telegraf.Input {
return &Burrow{}
})
}

View File

@ -73,7 +73,7 @@ func TestBurrowTopic(t *testing.T) {
s := getHTTPServer()
defer s.Close()
plugin := &burrow{Servers: []string{s.URL}}
plugin := &Burrow{Servers: []string{s.URL}}
acc := &testutil.Accumulator{}
require.NoError(t, plugin.Gather(acc))
@ -102,7 +102,7 @@ func TestBurrowPartition(t *testing.T) {
s := getHTTPServer()
defer s.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s.URL},
}
acc := &testutil.Accumulator{}
@ -150,7 +150,7 @@ func TestBurrowGroup(t *testing.T) {
s := getHTTPServer()
defer s.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s.URL},
}
acc := &testutil.Accumulator{}
@ -188,7 +188,7 @@ func TestMultipleServers(t *testing.T) {
s2 := getHTTPServer()
defer s2.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s1.URL, s2.URL},
}
acc := &testutil.Accumulator{}
@ -203,7 +203,7 @@ func TestMultipleRuns(t *testing.T) {
s := getHTTPServer()
defer s.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s.URL},
}
for i := 0; i < 4; i++ {
@ -220,7 +220,7 @@ func TestBasicAuthConfig(t *testing.T) {
s := getHTTPServerBasicAuth()
defer s.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s.URL},
Username: "test",
Password: "test",
@ -238,7 +238,7 @@ func TestFilterClusters(t *testing.T) {
s := getHTTPServer()
defer s.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s.URL},
ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match
}
@ -256,7 +256,7 @@ func TestFilterGroups(t *testing.T) {
s := getHTTPServer()
defer s.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s.URL},
GroupsInclude: []string{"group?"}, // group1 -> match
TopicsExclude: []string{"*"}, // exclude all
@ -274,7 +274,7 @@ func TestFilterTopics(t *testing.T) {
s := getHTTPServer()
defer s.Close()
plugin := &burrow{
plugin := &Burrow{
Servers: []string{s.URL},
TopicsInclude: []string{"topic?"}, // topicA -> match
GroupsExclude: []string{"*"}, // exclude all