diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index c7c437912..1ed701e2b 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -87,22 +87,6 @@ type stats struct { DequeueCounter int `xml:"dequeueCounter,attr"` } -func (a *ActiveMQ) createHTTPClient() (*http.Client, error) { - tlsCfg, err := a.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - }, - Timeout: time.Duration(a.ResponseTimeout), - } - - return client, nil -} - func (*ActiveMQ) SampleConfig() string { return sampleConfig } @@ -138,7 +122,61 @@ func (a *ActiveMQ) Init() error { return nil } -func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) { +func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { + dataQueues, err := a.getMetrics(a.queuesURL()) + if err != nil { + return err + } + queues := queues{} + err = xml.Unmarshal(dataQueues, &queues) + if err != nil { + return fmt.Errorf("queues XML unmarshal error: %w", err) + } + + dataTopics, err := a.getMetrics(a.topicsURL()) + if err != nil { + return err + } + topics := topics{} + err = xml.Unmarshal(dataTopics, &topics) + if err != nil { + return fmt.Errorf("topics XML unmarshal error: %w", err) + } + + dataSubscribers, err := a.getMetrics(a.subscribersURL()) + if err != nil { + return err + } + subscribers := subscribers{} + err = xml.Unmarshal(dataSubscribers, &subscribers) + if err != nil { + return fmt.Errorf("subscribers XML unmarshal error: %w", err) + } + + a.gatherQueuesMetrics(acc, queues) + a.gatherTopicsMetrics(acc, topics) + a.gatherSubscribersMetrics(acc, subscribers) + + return nil +} + +func (a *ActiveMQ) createHTTPClient() (*http.Client, error) { + tlsCfg, err := a.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: time.Duration(a.ResponseTimeout), + } + + return client, nil +} + +func (a *ActiveMQ) getMetrics(u string) ([]byte, error) { req, err := http.NewRequest("GET", u, nil) if err != nil { return nil, err @@ -161,7 +199,7 @@ func (a *ActiveMQ) GetMetrics(u string) ([]byte, error) { return io.ReadAll(resp.Body) } -func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues queues) { +func (a *ActiveMQ) gatherQueuesMetrics(acc telegraf.Accumulator, queues queues) { for _, queue := range queues.QueueItems { records := make(map[string]interface{}) tags := make(map[string]string) @@ -179,7 +217,7 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues queues) } } -func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics topics) { +func (a *ActiveMQ) gatherTopicsMetrics(acc telegraf.Accumulator, topics topics) { for _, topic := range topics.TopicItems { records := make(map[string]interface{}) tags := make(map[string]string) @@ -197,7 +235,7 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics topics) } } -func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers subscribers) { +func (a *ActiveMQ) gatherSubscribersMetrics(acc telegraf.Accumulator, subscribers subscribers) { for _, subscriber := range subscribers.SubscriberItems { records := make(map[string]interface{}) tags := make(map[string]string) @@ -221,55 +259,17 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber } } -func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { - dataQueues, err := a.GetMetrics(a.QueuesURL()) - if err != nil { - return err - } - queues := queues{} - err = xml.Unmarshal(dataQueues, &queues) - if err != nil { - return fmt.Errorf("queues XML unmarshal error: %w", err) - } - - dataTopics, err := a.GetMetrics(a.TopicsURL()) - if err != nil { - return err - } - topics := topics{} - err = xml.Unmarshal(dataTopics, &topics) - if err != nil { - return fmt.Errorf("topics XML unmarshal error: %w", err) - } - - dataSubscribers, err := a.GetMetrics(a.SubscribersURL()) - if err != nil { - return err - } - subscribers := subscribers{} - err = xml.Unmarshal(dataSubscribers, &subscribers) - if err != nil { - return fmt.Errorf("subscribers XML unmarshal error: %w", err) - } - - a.GatherQueuesMetrics(acc, queues) - a.GatherTopicsMetrics(acc, topics) - a.GatherSubscribersMetrics(acc, subscribers) - - return nil -} - -func (a *ActiveMQ) QueuesURL() string { +func (a *ActiveMQ) queuesURL() string { ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/queues.jsp")} return a.baseURL.ResolveReference(&ref).String() } -func (a *ActiveMQ) TopicsURL() string { +func (a *ActiveMQ) topicsURL() string { ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/topics.jsp")} return a.baseURL.ResolveReference(&ref).String() } -func (a *ActiveMQ) SubscribersURL() string { +func (a *ActiveMQ) subscribersURL() string { ref := url.URL{Path: path.Join("/", a.Webadmin, "/xml/subscribers.jsp")} return a.baseURL.ResolveReference(&ref).String() } diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go index 8bf78de37..cfece824c 100644 --- a/plugins/inputs/activemq/activemq_test.go +++ b/plugins/inputs/activemq/activemq_test.go @@ -52,7 +52,7 @@ func TestGatherQueuesMetrics(t *testing.T) { require.NoError(t, plugin.Init()) var acc testutil.Accumulator - plugin.GatherQueuesMetrics(&acc, queues) + plugin.gatherQueuesMetrics(&acc, queues) acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags) } @@ -98,7 +98,7 @@ func TestGatherTopicsMetrics(t *testing.T) { require.NoError(t, plugin.Init()) var acc testutil.Accumulator - plugin.GatherTopicsMetrics(&acc, topics) + plugin.gatherTopicsMetrics(&acc, topics) acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags) } @@ -137,7 +137,7 @@ func TestGatherSubscribersMetrics(t *testing.T) { require.NoError(t, plugin.Init()) var acc testutil.Accumulator - plugin.GatherSubscribersMetrics(&acc, subscribers) + plugin.gatherSubscribersMetrics(&acc, subscribers) acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags) } diff --git a/plugins/inputs/aliyuncms/aliyuncms.go b/plugins/inputs/aliyuncms/aliyuncms.go index be2b3d112..c20464c0f 100644 --- a/plugins/inputs/aliyuncms/aliyuncms.go +++ b/plugins/inputs/aliyuncms/aliyuncms.go @@ -27,7 +27,6 @@ import ( var sampleConfig string type ( - // AliyunCMS is aliyun cms config info. AliyunCMS struct { AccessKeyID string `toml:"access_key_id"` AccessKeySecret string `toml:"access_key_secret"` @@ -43,7 +42,7 @@ type ( Period config.Duration `toml:"period"` Delay config.Duration `toml:"delay"` Project string `toml:"project"` - Metrics []*Metric `toml:"metrics"` + Metrics []*metric `toml:"metrics"` RateLimit int `toml:"ratelimit"` Log telegraf.Logger `toml:"-"` @@ -57,8 +56,8 @@ type ( measurement string } - // Metric describes what metrics to get - Metric struct { + // metric describes what metrics to get + metric struct { ObjectsFilter string `toml:"objects_filter"` MetricNames []string `toml:"names"` Dimensions string `toml:"dimensions"` // String representation of JSON dimensions @@ -74,11 +73,6 @@ type ( } - // Dimension describe how to get metrics - Dimension struct { - Value string `toml:"value"` - } - aliyuncmsClient interface { DescribeMetricList(request *cms.DescribeMetricListRequest) (response *cms.DescribeMetricListResponse, err error) } @@ -113,7 +107,6 @@ func (*AliyunCMS) SampleConfig() string { return sampleConfig } -// Init perform checks of plugin inputs and initialize internals func (s *AliyunCMS) Init() error { if s.Project == "" { return errors.New("project is not set") @@ -216,7 +209,6 @@ func (s *AliyunCMS) Start(telegraf.Accumulator) error { return nil } -// Gather implements telegraf.Inputs interface func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error { s.updateWindow(time.Now()) @@ -225,16 +217,16 @@ func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error { defer lmtr.Stop() var wg sync.WaitGroup - for _, metric := range s.Metrics { + for _, m := range s.Metrics { // Prepare internal structure with data from discovery - s.prepareTagsAndDimensions(metric) - wg.Add(len(metric.MetricNames)) - for _, metricName := range metric.MetricNames { + s.prepareTagsAndDimensions(m) + wg.Add(len(m.MetricNames)) + for _, metricName := range m.MetricNames { <-lmtr.C - go func(metricName string, metric *Metric) { + go func(metricName string, m *metric) { defer wg.Done() - acc.AddError(s.gatherMetric(acc, metricName, metric)) - }(metricName, metric) + acc.AddError(s.gatherMetric(acc, metricName, m)) + }(metricName, m) } wg.Wait() } @@ -269,7 +261,7 @@ func (s *AliyunCMS) updateWindow(relativeTo time.Time) { } // Gather given metric and emit error -func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *Metric) error { +func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *metric) error { for _, region := range s.Regions { req := cms.CreateDescribeMetricListRequest() req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10) @@ -372,7 +364,7 @@ func parseTag(tagSpec string, data interface{}) (tagKey, tagValue string, err er return tagKey, tagValue, nil } -func (s *AliyunCMS) prepareTagsAndDimensions(metric *Metric) { +func (s *AliyunCMS) prepareTagsAndDimensions(metric *metric) { var ( newData bool defaultTags = []string{"RegionId:RegionId"} diff --git a/plugins/inputs/aliyuncms/aliyuncms_test.go b/plugins/inputs/aliyuncms/aliyuncms_test.go index 5b0e48f77..ebd7c8bfb 100644 --- a/plugins/inputs/aliyuncms/aliyuncms_test.go +++ b/plugins/inputs/aliyuncms/aliyuncms_test.go @@ -240,7 +240,7 @@ func TestPluginMetricsInitialize(t *testing.T) { expectedErrorString string regions []string discoveryRegions []string - metrics []*Metric + metrics []*metric }{ { name: "Valid project", @@ -248,7 +248,7 @@ func TestPluginMetricsInitialize(t *testing.T) { regions: []string{"cn-shanghai"}, accessKeyID: "dummy", accessKeySecret: "dummy", - metrics: []*Metric{ + metrics: []*metric{ { MetricNames: []string{}, Dimensions: `{"instanceId": "i-abcdefgh123456"}`, @@ -261,7 +261,7 @@ func TestPluginMetricsInitialize(t *testing.T) { regions: []string{"cn-shanghai"}, accessKeyID: "dummy", accessKeySecret: "dummy", - metrics: []*Metric{ + metrics: []*metric{ { MetricNames: []string{}, Dimensions: `[{"instanceId": "p-example"},{"instanceId": "q-example"}]`, @@ -275,7 +275,7 @@ func TestPluginMetricsInitialize(t *testing.T) { accessKeyID: "dummy", accessKeySecret: "dummy", expectedErrorString: `cannot parse dimensions (neither obj, nor array) "[": unexpected end of JSON input`, - metrics: []*Metric{ + metrics: []*metric{ { MetricNames: []string{}, Dimensions: `[`, @@ -343,7 +343,7 @@ func TestGatherMetric(t *testing.T) { Regions: []string{"cn-shanghai"}, } - metric := &Metric{ + metric := &metric{ MetricNames: []string{}, Dimensions: `"instanceId": "i-abcdefgh123456"`, } @@ -374,7 +374,7 @@ func TestGatherMetric(t *testing.T) { } func TestGather(t *testing.T) { - metric := &Metric{ + m := &metric{ MetricNames: []string{}, Dimensions: `{"instanceId": "i-abcdefgh123456"}`, } @@ -382,7 +382,7 @@ func TestGather(t *testing.T) { AccessKeyID: "my_access_key_id", AccessKeySecret: "my_access_key_secret", Project: "acs_slb_dashboard", - Metrics: []*Metric{metric}, + Metrics: []*metric{m}, RateLimit: 200, measurement: formatMeasurement("acs_slb_dashboard"), Regions: []string{"cn-shanghai"}, diff --git a/plugins/inputs/aliyuncms/discovery.go b/plugins/inputs/aliyuncms/discovery.go index 9a7210647..574570e14 100644 --- a/plugins/inputs/aliyuncms/discovery.go +++ b/plugins/inputs/aliyuncms/discovery.go @@ -395,8 +395,7 @@ func (dt *discoveryTool) getDiscoveryDataAcrossRegions(lmtr chan bool) (map[stri return resultData, nil } -// start the discovery pooling -// In case smth. new found it will be reported back through `DataChan` +// start the discovery pooling; in case something new is found, it will be reported back through `dataChan` func (dt *discoveryTool) start() { var ( err error @@ -443,8 +442,7 @@ func (dt *discoveryTool) start() { }() } -// stop the discovery loop, making sure -// all data is read from 'dataChan' +// stop the discovery loop, making sure all data is read from 'dataChan' func (dt *discoveryTool) stop() { close(dt.done) diff --git a/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go b/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go index 922fc56c0..d30f13ba6 100644 --- a/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go +++ b/plugins/inputs/amd_rocm_smi/amd_rocm_smi.go @@ -28,215 +28,7 @@ type ROCmSMI struct { Log telegraf.Logger `toml:"-"` } -func (*ROCmSMI) SampleConfig() string { - return sampleConfig -} - -// Gather implements the telegraf interface -func (rsmi *ROCmSMI) Gather(acc telegraf.Accumulator) error { - data, err := rsmi.pollROCmSMI() - if err != nil { - return fmt.Errorf("failed to execute command in pollROCmSMI: %w", err) - } - - return gatherROCmSMI(data, acc) -} - -func (rsmi *ROCmSMI) Start(telegraf.Accumulator) error { - if _, err := os.Stat(rsmi.BinPath); os.IsNotExist(err) { - binPath, err := exec.LookPath("rocm-smi") - if err != nil { - return &internal.StartupError{Err: err} - } - rsmi.BinPath = binPath - } - - return nil -} - -func (rsmi *ROCmSMI) Stop() {} - -func init() { - inputs.Add("amd_rocm_smi", func() telegraf.Input { - return &ROCmSMI{ - BinPath: "/opt/rocm/bin/rocm-smi", - Timeout: config.Duration(5 * time.Second), - } - }) -} - -func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) { - // Construct and execute metrics query, there currently exist (ROCm v4.3.x) a "-a" option - // that does not provide all the information, so each needed parameter is set manually - cmd := exec.Command(rsmi.BinPath, - "-o", - "-l", - "-m", - "-M", - "-g", - "-c", - "-t", - "-u", - "-i", - "-f", - "-p", - "-P", - "-s", - "-S", - "-v", - "--showreplaycount", - "--showpids", - "--showdriverversion", - "--showmemvendor", - "--showfwinfo", - "--showproductname", - "--showserial", - "--showuniqueid", - "--showbus", - "--showpendingpages", - "--showpagesinfo", - "--showmeminfo", - "all", - "--showretiredpages", - "--showunreservablepages", - "--showmemuse", - "--showvoltage", - "--showtopo", - "--showtopoweight", - "--showtopohops", - "--showtopotype", - "--showtoponuma", - "--json") - - return internal.StdOutputTimeout(cmd, time.Duration(rsmi.Timeout)) -} - -func gatherROCmSMI(ret []byte, acc telegraf.Accumulator) error { - var gpus map[string]GPU - var sys map[string]sysInfo - - err1 := json.Unmarshal(ret, &gpus) - if err1 != nil { - return err1 - } - - err2 := json.Unmarshal(ret, &sys) - if err2 != nil { - return err2 - } - - metrics := genTagsFields(gpus, sys) - for _, metric := range metrics { - acc.AddFields(measurement, metric.fields, metric.tags) - } - - return nil -} - -type metric struct { - tags map[string]string - fields map[string]interface{} -} - -func genTagsFields(gpus map[string]GPU, system map[string]sysInfo) []metric { - metrics := []metric{} - for cardID := range gpus { - if strings.Contains(cardID, "card") { - tags := map[string]string{ - "name": cardID, - } - fields := map[string]interface{}{} - - payload := gpus[cardID] - //nolint:errcheck // silently treat as zero if malformed - totVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalMemory, 10, 64) - //nolint:errcheck // silently treat as zero if malformed - usdVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalUsedMemory, 10, 64) - strFree := strconv.FormatInt(totVRAM-usdVRAM, 10) - - // Try using value found in Device ID first. If not found, try GPU - // ID for backwards compatibility. - setTagIfUsed(tags, "gpu_id", payload.DeviceID) - setTagIfUsed(tags, "gpu_id", payload.GpuID) - - setTagIfUsed(tags, "gpu_unique_id", payload.GpuUniqueID) - - setIfUsed("int", fields, "driver_version", strings.ReplaceAll(system["system"].DriverVersion, ".", "")) - setIfUsed("int", fields, "fan_speed", payload.GpuFanSpeedPercentage) - setIfUsed("int64", fields, "memory_total", payload.GpuVRAMTotalMemory) - setIfUsed("int64", fields, "memory_used", payload.GpuVRAMTotalUsedMemory) - setIfUsed("int64", fields, "memory_free", strFree) - setIfUsed("float", fields, "temperature_sensor_edge", payload.GpuTemperatureSensorEdge) - setIfUsed("float", fields, "temperature_sensor_junction", payload.GpuTemperatureSensorJunction) - setIfUsed("float", fields, "temperature_sensor_memory", payload.GpuTemperatureSensorMemory) - setIfUsed("int", fields, "utilization_gpu", payload.GpuUsePercentage) - // Try using allocated percentage first. - setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryAllocatedPercentage) - setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryUsePercentage) - setIfUsed("int", fields, "clocks_current_sm", strings.Trim(payload.GpuSclkClockSpeed, "(Mhz)")) - setIfUsed("int", fields, "clocks_current_memory", strings.Trim(payload.GpuMclkClockSpeed, "(Mhz)")) - setIfUsed("int", fields, "clocks_current_display", strings.Trim(payload.GpuDcefClkClockSpeed, "(Mhz)")) - setIfUsed("int", fields, "clocks_current_fabric", strings.Trim(payload.GpuFclkClockSpeed, "(Mhz)")) - setIfUsed("int", fields, "clocks_current_system", strings.Trim(payload.GpuSocclkClockSpeed, "(Mhz)")) - setIfUsed("float", fields, "power_draw", payload.GpuAveragePower) - setIfUsed("str", fields, "card_series", payload.GpuCardSeries) - setIfUsed("str", fields, "card_model", payload.GpuCardModel) - setIfUsed("str", fields, "card_vendor", payload.GpuCardVendor) - - metrics = append(metrics, metric{tags, fields}) - } - } - return metrics -} - -func setTagIfUsed(m map[string]string, k, v string) { - if v != "" { - m[k] = v - } -} - -func setIfUsed(t string, m map[string]interface{}, k, v string) { - vals := strings.Fields(v) - if len(vals) < 1 { - return - } - - val := vals[0] - - switch t { - case "float": - if val != "" { - f, err := strconv.ParseFloat(val, 64) - if err == nil { - m[k] = f - } - } - case "int": - if val != "" { - i, err := strconv.Atoi(val) - if err == nil { - m[k] = i - } - } - case "int64": - if val != "" { - i, err := strconv.ParseInt(val, 10, 64) - if err == nil { - m[k] = i - } - } - case "str": - if val != "" { - m[k] = val - } - } -} - -type sysInfo struct { - DriverVersion string `json:"Driver version"` -} - -type GPU struct { +type gpu struct { DeviceID string `json:"Device ID"` GpuID string `json:"GPU ID"` GpuUniqueID string `json:"Unique ID"` @@ -304,3 +96,210 @@ type GPU struct { GpuGTTTotalMemory string `json:"GTT Total Memory (B)"` GpuGTTTotalUsedMemory string `json:"GTT Total Used Memory (B)"` } + +type sysInfo struct { + DriverVersion string `json:"Driver version"` +} + +type metric struct { + tags map[string]string + fields map[string]interface{} +} + +func (*ROCmSMI) SampleConfig() string { + return sampleConfig +} + +func (rsmi *ROCmSMI) Start(telegraf.Accumulator) error { + if _, err := os.Stat(rsmi.BinPath); os.IsNotExist(err) { + binPath, err := exec.LookPath("rocm-smi") + if err != nil { + return &internal.StartupError{Err: err} + } + rsmi.BinPath = binPath + } + + return nil +} + +func (rsmi *ROCmSMI) Gather(acc telegraf.Accumulator) error { + data, err := rsmi.pollROCmSMI() + if err != nil { + return fmt.Errorf("failed to execute command in pollROCmSMI: %w", err) + } + + return gatherROCmSMI(data, acc) +} + +func (rsmi *ROCmSMI) Stop() {} + +func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) { + // Construct and execute metrics query, there currently exist (ROCm v4.3.x) a "-a" option + // that does not provide all the information, so each needed parameter is set manually + cmd := exec.Command(rsmi.BinPath, + "-o", + "-l", + "-m", + "-M", + "-g", + "-c", + "-t", + "-u", + "-i", + "-f", + "-p", + "-P", + "-s", + "-S", + "-v", + "--showreplaycount", + "--showpids", + "--showdriverversion", + "--showmemvendor", + "--showfwinfo", + "--showproductname", + "--showserial", + "--showuniqueid", + "--showbus", + "--showpendingpages", + "--showpagesinfo", + "--showmeminfo", + "all", + "--showretiredpages", + "--showunreservablepages", + "--showmemuse", + "--showvoltage", + "--showtopo", + "--showtopoweight", + "--showtopohops", + "--showtopotype", + "--showtoponuma", + "--json") + + return internal.StdOutputTimeout(cmd, time.Duration(rsmi.Timeout)) +} + +func genTagsFields(gpus map[string]gpu, system map[string]sysInfo) []metric { + metrics := []metric{} + for cardID := range gpus { + if strings.Contains(cardID, "card") { + tags := map[string]string{ + "name": cardID, + } + fields := map[string]interface{}{} + + payload := gpus[cardID] + //nolint:errcheck // silently treat as zero if malformed + totVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalMemory, 10, 64) + //nolint:errcheck // silently treat as zero if malformed + usdVRAM, _ := strconv.ParseInt(payload.GpuVRAMTotalUsedMemory, 10, 64) + strFree := strconv.FormatInt(totVRAM-usdVRAM, 10) + + // Try using value found in Device ID first. If not found, try GPU + // ID for backwards compatibility. + setTagIfUsed(tags, "gpu_id", payload.DeviceID) + setTagIfUsed(tags, "gpu_id", payload.GpuID) + + setTagIfUsed(tags, "gpu_unique_id", payload.GpuUniqueID) + + setIfUsed("int", fields, "driver_version", strings.ReplaceAll(system["system"].DriverVersion, ".", "")) + setIfUsed("int", fields, "fan_speed", payload.GpuFanSpeedPercentage) + setIfUsed("int64", fields, "memory_total", payload.GpuVRAMTotalMemory) + setIfUsed("int64", fields, "memory_used", payload.GpuVRAMTotalUsedMemory) + setIfUsed("int64", fields, "memory_free", strFree) + setIfUsed("float", fields, "temperature_sensor_edge", payload.GpuTemperatureSensorEdge) + setIfUsed("float", fields, "temperature_sensor_junction", payload.GpuTemperatureSensorJunction) + setIfUsed("float", fields, "temperature_sensor_memory", payload.GpuTemperatureSensorMemory) + setIfUsed("int", fields, "utilization_gpu", payload.GpuUsePercentage) + // Try using allocated percentage first. + setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryAllocatedPercentage) + setIfUsed("int", fields, "utilization_memory", payload.GpuMemoryUsePercentage) + setIfUsed("int", fields, "clocks_current_sm", strings.Trim(payload.GpuSclkClockSpeed, "(Mhz)")) + setIfUsed("int", fields, "clocks_current_memory", strings.Trim(payload.GpuMclkClockSpeed, "(Mhz)")) + setIfUsed("int", fields, "clocks_current_display", strings.Trim(payload.GpuDcefClkClockSpeed, "(Mhz)")) + setIfUsed("int", fields, "clocks_current_fabric", strings.Trim(payload.GpuFclkClockSpeed, "(Mhz)")) + setIfUsed("int", fields, "clocks_current_system", strings.Trim(payload.GpuSocclkClockSpeed, "(Mhz)")) + setIfUsed("float", fields, "power_draw", payload.GpuAveragePower) + setIfUsed("str", fields, "card_series", payload.GpuCardSeries) + setIfUsed("str", fields, "card_model", payload.GpuCardModel) + setIfUsed("str", fields, "card_vendor", payload.GpuCardVendor) + + metrics = append(metrics, metric{tags, fields}) + } + } + return metrics +} + +func gatherROCmSMI(ret []byte, acc telegraf.Accumulator) error { + var gpus map[string]gpu + var sys map[string]sysInfo + + err1 := json.Unmarshal(ret, &gpus) + if err1 != nil { + return err1 + } + + err2 := json.Unmarshal(ret, &sys) + if err2 != nil { + return err2 + } + + metrics := genTagsFields(gpus, sys) + for _, metric := range metrics { + acc.AddFields(measurement, metric.fields, metric.tags) + } + + return nil +} + +func setTagIfUsed(m map[string]string, k, v string) { + if v != "" { + m[k] = v + } +} + +func setIfUsed(t string, m map[string]interface{}, k, v string) { + vals := strings.Fields(v) + if len(vals) < 1 { + return + } + + val := vals[0] + + switch t { + case "float": + if val != "" { + f, err := strconv.ParseFloat(val, 64) + if err == nil { + m[k] = f + } + } + case "int": + if val != "" { + i, err := strconv.Atoi(val) + if err == nil { + m[k] = i + } + } + case "int64": + if val != "" { + i, err := strconv.ParseInt(val, 10, 64) + if err == nil { + m[k] = i + } + } + case "str": + if val != "" { + m[k] = val + } + } +} + +func init() { + inputs.Add("amd_rocm_smi", func() telegraf.Input { + return &ROCmSMI{ + BinPath: "/opt/rocm/bin/rocm-smi", + Timeout: config.Duration(5 * time.Second), + } + }) +} diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index f1e2fa8d0..dba902bce 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -26,9 +26,10 @@ var sampleConfig string var once sync.Once type empty struct{} +type externalAuth struct{} + type semaphore chan empty -// AMQPConsumer is the top level struct for this plugin type AMQPConsumer struct { URL string `toml:"url" deprecated:"1.7.0;1.35.0;use 'brokers' instead"` Brokers []string `toml:"brokers"` @@ -62,11 +63,10 @@ type AMQPConsumer struct { decoder internal.ContentDecoder } -type externalAuth struct{} - func (a *externalAuth) Mechanism() string { return "EXTERNAL" } + func (a *externalAuth) Response() string { return "\000" } @@ -115,51 +115,6 @@ func (a *AMQPConsumer) SetParser(parser telegraf.Parser) { a.parser = parser } -// All gathering is done in the Start function -func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { - return nil -} - -func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { - // make new tls config - tlsCfg, err := a.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - var auth []amqp.Authentication - - if strings.EqualFold(a.AuthMethod, "EXTERNAL") { - auth = []amqp.Authentication{&externalAuth{}} - } else if !a.Username.Empty() || !a.Password.Empty() { - username, err := a.Username.Get() - if err != nil { - return nil, fmt.Errorf("getting username failed: %w", err) - } - defer username.Destroy() - - password, err := a.Password.Get() - if err != nil { - return nil, fmt.Errorf("getting password failed: %w", err) - } - defer password.Destroy() - - auth = []amqp.Authentication{ - &amqp.PlainAuth{ - Username: username.String(), - Password: password.String(), - }, - } - } - amqpConfig := amqp.Config{ - TLSClientConfig: tlsCfg, - SASL: auth, // if nil, it will be PLAIN - Dial: amqp.DefaultDial(time.Duration(a.Timeout)), - } - return &amqpConfig, nil -} - -// Start satisfies the telegraf.ServiceInput interface func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { amqpConf, err := a.createConfig() if err != nil { @@ -219,6 +174,63 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { return nil } +func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (a *AMQPConsumer) Stop() { + // We did not connect successfully so there is nothing to do here. + if a.conn == nil || a.conn.IsClosed() { + return + } + a.cancel() + a.wg.Wait() + err := a.conn.Close() + if err != nil && !errors.Is(err, amqp.ErrClosed) { + a.Log.Errorf("Error closing AMQP connection: %s", err) + return + } +} + +func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { + // make new tls config + tlsCfg, err := a.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + var auth []amqp.Authentication + + if strings.EqualFold(a.AuthMethod, "EXTERNAL") { + auth = []amqp.Authentication{&externalAuth{}} + } else if !a.Username.Empty() || !a.Password.Empty() { + username, err := a.Username.Get() + if err != nil { + return nil, fmt.Errorf("getting username failed: %w", err) + } + defer username.Destroy() + + password, err := a.Password.Get() + if err != nil { + return nil, fmt.Errorf("getting password failed: %w", err) + } + defer password.Destroy() + + auth = []amqp.Authentication{ + &amqp.PlainAuth{ + Username: username.String(), + Password: password.String(), + }, + } + } + amqpConfig := amqp.Config{ + TLSClientConfig: tlsCfg, + SASL: auth, // if nil, it will be PLAIN + Dial: amqp.DefaultDial(time.Duration(a.Timeout)), + } + return &amqpConfig, nil +} + func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) { brokers := a.Brokers @@ -477,20 +489,6 @@ func (a *AMQPConsumer) onDelivery(track telegraf.DeliveryInfo) bool { return true } -func (a *AMQPConsumer) Stop() { - // We did not connect successfully so there is nothing to do here. - if a.conn == nil || a.conn.IsClosed() { - return - } - a.cancel() - a.wg.Wait() - err := a.conn.Close() - if err != nil && !errors.Is(err, amqp.ErrClosed) { - a.Log.Errorf("Error closing AMQP connection: %s", err) - return - } -} - func init() { inputs.Add("amqp_consumer", func() telegraf.Input { return &AMQPConsumer{Timeout: config.Duration(30 * time.Second)} diff --git a/plugins/inputs/aurora/aurora.go b/plugins/inputs/aurora/aurora.go index a76ca835d..e8b10c11a 100644 --- a/plugins/inputs/aurora/aurora.go +++ b/plugins/inputs/aurora/aurora.go @@ -21,19 +21,19 @@ import ( //go:embed sample.conf var sampleConfig string -type RoleType int +type roleType int const ( - Unknown RoleType = iota - Leader - Follower + unknown roleType = iota + leader + follower ) -func (r RoleType) String() string { +func (r roleType) String() string { switch r { - case Leader: + case leader: return "leader" - case Follower: + case follower: return "follower" default: return "unknown" @@ -45,7 +45,7 @@ var ( defaultRoles = []string{"leader", "follower"} ) -type Vars map[string]interface{} +type vars map[string]interface{} type Aurora struct { Schedulers []string `toml:"schedulers"` @@ -136,7 +136,7 @@ func (a *Aurora) initialize() error { return nil } -func (a *Aurora) roleEnabled(role RoleType) bool { +func (a *Aurora) roleEnabled(role roleType) bool { if len(a.Roles) == 0 { return true } @@ -149,12 +149,12 @@ func (a *Aurora) roleEnabled(role RoleType) bool { return false } -func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, error) { +func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (roleType, error) { loc := *origin loc.Path = "leaderhealth" req, err := http.NewRequest("GET", loc.String(), nil) if err != nil { - return Unknown, err + return unknown, err } if a.Username != "" || a.Password != "" { @@ -164,26 +164,26 @@ func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, err resp, err := a.client.Do(req.WithContext(ctx)) if err != nil { - return Unknown, err + return unknown, err } if err := resp.Body.Close(); err != nil { - return Unknown, fmt.Errorf("closing body failed: %w", err) + return unknown, fmt.Errorf("closing body failed: %w", err) } switch resp.StatusCode { case http.StatusOK: - return Leader, nil + return leader, nil case http.StatusBadGateway: fallthrough case http.StatusServiceUnavailable: - return Follower, nil + return follower, nil default: - return Unknown, fmt.Errorf("%v", resp.Status) + return unknown, fmt.Errorf("%v", resp.Status) } } func (a *Aurora) gatherScheduler( - ctx context.Context, origin *url.URL, role RoleType, acc telegraf.Accumulator, + ctx context.Context, origin *url.URL, role roleType, acc telegraf.Accumulator, ) error { loc := *origin loc.Path = "vars.json" @@ -207,16 +207,16 @@ func (a *Aurora) gatherScheduler( return fmt.Errorf("%v", resp.Status) } - var vars Vars + var metrics vars decoder := json.NewDecoder(resp.Body) decoder.UseNumber() - err = decoder.Decode(&vars) + err = decoder.Decode(&metrics) if err != nil { return fmt.Errorf("decoding response: %w", err) } - var fields = make(map[string]interface{}, len(vars)) - for k, v := range vars { + var fields = make(map[string]interface{}, len(metrics)) + for k, v := range metrics { switch v := v.(type) { case json.Number: // Aurora encodes numbers as you would specify them as a literal, diff --git a/plugins/inputs/aurora/aurora_test.go b/plugins/inputs/aurora/aurora_test.go index 6eeaa93f4..b43949f25 100644 --- a/plugins/inputs/aurora/aurora_test.go +++ b/plugins/inputs/aurora/aurora_test.go @@ -12,8 +12,8 @@ import ( ) type ( - TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) - CheckFunc func(t *testing.T, err error, acc *testutil.Accumulator) + testHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + checkFunc func(t *testing.T, err error, acc *testutil.Accumulator) ) func TestAurora(t *testing.T) { @@ -28,9 +28,9 @@ func TestAurora(t *testing.T) { plugin *Aurora schedulers []string roles []string - leaderhealth TestHandlerFunc - varsjson TestHandlerFunc - check CheckFunc + leaderhealth testHandlerFunc + varsjson testHandlerFunc + check checkFunc }{ { name: "minimal", diff --git a/plugins/inputs/azure_monitor/azure_monitor.go b/plugins/inputs/azure_monitor/azure_monitor.go index 610007a29..93b3627bf 100644 --- a/plugins/inputs/azure_monitor/azure_monitor.go +++ b/plugins/inputs/azure_monitor/azure_monitor.go @@ -21,9 +21,9 @@ type AzureMonitor struct { ClientSecret string `toml:"client_secret"` TenantID string `toml:"tenant_id"` CloudOption string `toml:"cloud_option,omitempty"` - ResourceTargets []*ResourceTarget `toml:"resource_target"` - ResourceGroupTargets []*ResourceGroupTarget `toml:"resource_group_target"` - SubscriptionTargets []*Resource `toml:"subscription_target"` + ResourceTargets []*resourceTarget `toml:"resource_target"` + ResourceGroupTargets []*resourceGroupTarget `toml:"resource_group_target"` + SubscriptionTargets []*resource `toml:"subscription_target"` Log telegraf.Logger `toml:"-"` receiver *receiver.AzureMonitorMetricsReceiver @@ -31,18 +31,18 @@ type AzureMonitor struct { azureClients *receiver.AzureClients } -type ResourceTarget struct { +type resourceTarget struct { ResourceID string `toml:"resource_id"` Metrics []string `toml:"metrics"` Aggregations []string `toml:"aggregations"` } -type ResourceGroupTarget struct { +type resourceGroupTarget struct { ResourceGroup string `toml:"resource_group"` - Resources []*Resource `toml:"resource"` + Resources []*resource `toml:"resource"` } -type Resource struct { +type resource struct { ResourceType string `toml:"resource_type"` Metrics []string `toml:"metrics"` Aggregations []string `toml:"aggregations"` @@ -62,7 +62,6 @@ func (am *AzureMonitor) SampleConfig() string { return sampleConfig } -// Init is for setup, and validating config. func (am *AzureMonitor) Init() error { var clientOptions azcore.ClientOptions switch am.CloudOption { diff --git a/plugins/inputs/azure_storage_queue/azure_storage_queue.go b/plugins/inputs/azure_storage_queue/azure_storage_queue.go index d65a8fe7b..d8100e7a5 100644 --- a/plugins/inputs/azure_storage_queue/azure_storage_queue.go +++ b/plugins/inputs/azure_storage_queue/azure_storage_queue.go @@ -42,45 +42,8 @@ func (a *AzureStorageQueue) Init() error { return nil } -func (a *AzureStorageQueue) GetServiceURL() (azqueue.ServiceURL, error) { - if a.serviceURL == nil { - _url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net") - if err != nil { - return azqueue.ServiceURL{}, err - } - - credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey) - if err != nil { - return azqueue.ServiceURL{}, err - } - - pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{}) - - serviceURL := azqueue.NewServiceURL(*_url, pipeline) - a.serviceURL = &serviceURL - } - return *a.serviceURL, nil -} - -func (a *AzureStorageQueue) GatherQueueMetrics( - acc telegraf.Accumulator, - queueItem azqueue.QueueItem, - properties *azqueue.QueueGetPropertiesResponse, - peekedMessage *azqueue.PeekedMessage, -) { - fields := make(map[string]interface{}) - tags := make(map[string]string) - tags["queue"] = strings.TrimSpace(queueItem.Name) - tags["account"] = a.StorageAccountName - fields["size"] = properties.ApproximateMessagesCount() - if peekedMessage != nil { - fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano() - } - acc.AddFields("azure_storage_queues", fields, tags) -} - func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error { - serviceURL, err := a.GetServiceURL() + serviceURL, err := a.getServiceURL() if err != nil { return err } @@ -117,12 +80,49 @@ func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error { } } - a.GatherQueueMetrics(acc, queueItem, properties, peekedMessage) + a.gatherQueueMetrics(acc, queueItem, properties, peekedMessage) } } return nil } +func (a *AzureStorageQueue) getServiceURL() (azqueue.ServiceURL, error) { + if a.serviceURL == nil { + _url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net") + if err != nil { + return azqueue.ServiceURL{}, err + } + + credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey) + if err != nil { + return azqueue.ServiceURL{}, err + } + + pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{}) + + serviceURL := azqueue.NewServiceURL(*_url, pipeline) + a.serviceURL = &serviceURL + } + return *a.serviceURL, nil +} + +func (a *AzureStorageQueue) gatherQueueMetrics( + acc telegraf.Accumulator, + queueItem azqueue.QueueItem, + properties *azqueue.QueueGetPropertiesResponse, + peekedMessage *azqueue.PeekedMessage, +) { + fields := make(map[string]interface{}) + tags := make(map[string]string) + tags["queue"] = strings.TrimSpace(queueItem.Name) + tags["account"] = a.StorageAccountName + fields["size"] = properties.ApproximateMessagesCount() + if peekedMessage != nil { + fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano() + } + acc.AddFields("azure_storage_queues", fields, tags) +} + func init() { inputs.Add("azure_storage_queue", func() telegraf.Input { return &AzureStorageQueue{PeekOldestMessageAge: true} diff --git a/plugins/inputs/bcache/bcache.go b/plugins/inputs/bcache/bcache.go index 2157fad55..37114a2d9 100644 --- a/plugins/inputs/bcache/bcache.go +++ b/plugins/inputs/bcache/bcache.go @@ -20,8 +20,44 @@ import ( var sampleConfig string type Bcache struct { - BcachePath string - BcacheDevs []string + BcachePath string `toml:"bcachePath"` + BcacheDevs []string `toml:"bcacheDevs"` +} + +func (*Bcache) SampleConfig() string { + return sampleConfig +} + +func (b *Bcache) Gather(acc telegraf.Accumulator) error { + bcacheDevsChecked := make(map[string]bool) + var restrictDevs bool + if len(b.BcacheDevs) != 0 { + restrictDevs = true + for _, bcacheDev := range b.BcacheDevs { + bcacheDevsChecked[bcacheDev] = true + } + } + + bcachePath := b.BcachePath + if len(bcachePath) == 0 { + bcachePath = "/sys/fs/bcache" + } + bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") + if len(bdevs) < 1 || err != nil { + return errors.New("can't find any bcache device") + } + for _, bdev := range bdevs { + if restrictDevs { + bcacheDev := getTags(bdev)["bcache_dev"] + if !bcacheDevsChecked[bcacheDev] { + continue + } + } + if err := b.gatherBcache(bdev, acc); err != nil { + return fmt.Errorf("gathering bcache failed: %w", err) + } + } + return nil } func getTags(bdev string) map[string]string { @@ -102,42 +138,6 @@ func (b *Bcache) gatherBcache(bdev string, acc telegraf.Accumulator) error { return nil } -func (*Bcache) SampleConfig() string { - return sampleConfig -} - -func (b *Bcache) Gather(acc telegraf.Accumulator) error { - bcacheDevsChecked := make(map[string]bool) - var restrictDevs bool - if len(b.BcacheDevs) != 0 { - restrictDevs = true - for _, bcacheDev := range b.BcacheDevs { - bcacheDevsChecked[bcacheDev] = true - } - } - - bcachePath := b.BcachePath - if len(bcachePath) == 0 { - bcachePath = "/sys/fs/bcache" - } - bdevs, err := filepath.Glob(bcachePath + "/*/bdev*") - if len(bdevs) < 1 || err != nil { - return errors.New("can't find any bcache device") - } - for _, bdev := range bdevs { - if restrictDevs { - bcacheDev := getTags(bdev)["bcache_dev"] - if !bcacheDevsChecked[bcacheDev] { - continue - } - } - if err := b.gatherBcache(bdev, acc); err != nil { - return fmt.Errorf("gathering bcache failed: %w", err) - } - } - return nil -} - func init() { inputs.Add("bcache", func() telegraf.Input { return &Bcache{} diff --git a/plugins/inputs/beat/beat.go b/plugins/inputs/beat/beat.go index 3296f95cc..e4b121ada 100644 --- a/plugins/inputs/beat/beat.go +++ b/plugins/inputs/beat/beat.go @@ -20,23 +20,10 @@ import ( //go:embed sample.conf var sampleConfig string -const suffixInfo = "/" -const suffixStats = "/stats" - -type Info struct { - Beat string `json:"beat"` - Hostname string `json:"hostname"` - Name string `json:"name"` - UUID string `json:"uuid"` - Version string `json:"version"` -} - -type Stats struct { - Beat map[string]interface{} `json:"beat"` - FileBeat interface{} `json:"filebeat"` - Libbeat interface{} `json:"libbeat"` - System interface{} `json:"system"` -} +const ( + suffixInfo = "/" + suffixStats = "/stats" +) type Beat struct { URL string `toml:"url"` @@ -54,14 +41,19 @@ type Beat struct { client *http.Client } -func NewBeat() *Beat { - return &Beat{ - URL: "http://127.0.0.1:5066", - Includes: []string{"beat", "libbeat", "filebeat"}, - Method: "GET", - Headers: make(map[string]string), - Timeout: config.Duration(time.Second * 5), - } +type info struct { + Beat string `json:"beat"` + Hostname string `json:"hostname"` + Name string `json:"name"` + UUID string `json:"uuid"` + Version string `json:"version"` +} + +type stats struct { + Beat map[string]interface{} `json:"beat"` + FileBeat interface{} `json:"filebeat"` + LibBeat interface{} `json:"libbeat"` + System interface{} `json:"system"` } func (*Beat) SampleConfig() string { @@ -86,6 +78,67 @@ func (beat *Beat) Init() error { return nil } +func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { + beatStats := &stats{} + beatInfo := &info{} + + infoURL, err := url.Parse(beat.URL + suffixInfo) + if err != nil { + return err + } + statsURL, err := url.Parse(beat.URL + suffixStats) + if err != nil { + return err + } + + err = beat.gatherJSONData(infoURL.String(), beatInfo) + if err != nil { + return err + } + tags := map[string]string{ + "beat_beat": beatInfo.Beat, + "beat_id": beatInfo.UUID, + "beat_name": beatInfo.Name, + "beat_host": beatInfo.Hostname, + "beat_version": beatInfo.Version, + } + + err = beat.gatherJSONData(statsURL.String(), beatStats) + if err != nil { + return err + } + + for _, name := range beat.Includes { + var stats interface{} + var metric string + + switch name { + case "beat": + stats = beatStats.Beat + metric = "beat" + case "filebeat": + stats = beatStats.FileBeat + metric = "beat_filebeat" + case "system": + stats = beatStats.System + metric = "beat_system" + case "libbeat": + stats = beatStats.LibBeat + metric = "beat_libbeat" + default: + return fmt.Errorf("unknown stats-type %q", name) + } + flattener := parsers_json.JSONFlattener{} + err := flattener.FullFlattenJSON("", stats, true, true) + if err != nil { + return err + } + accumulator.AddFields(metric, flattener.Fields, tags) + } + + return nil +} + // createHTTPClient create a clients to access API func (beat *Beat) createHTTPClient() (*http.Client, error) { tlsConfig, err := beat.ClientConfig.TLSConfig() @@ -130,69 +183,18 @@ func (beat *Beat) gatherJSONData(address string, value interface{}) error { return json.NewDecoder(response.Body).Decode(value) } -func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { - beatStats := &Stats{} - beatInfo := &Info{} - - infoURL, err := url.Parse(beat.URL + suffixInfo) - if err != nil { - return err +func newBeat() *Beat { + return &Beat{ + URL: "http://127.0.0.1:5066", + Includes: []string{"beat", "libbeat", "filebeat"}, + Method: "GET", + Headers: make(map[string]string), + Timeout: config.Duration(time.Second * 5), } - statsURL, err := url.Parse(beat.URL + suffixStats) - if err != nil { - return err - } - - err = beat.gatherJSONData(infoURL.String(), beatInfo) - if err != nil { - return err - } - tags := map[string]string{ - "beat_beat": beatInfo.Beat, - "beat_id": beatInfo.UUID, - "beat_name": beatInfo.Name, - "beat_host": beatInfo.Hostname, - "beat_version": beatInfo.Version, - } - - err = beat.gatherJSONData(statsURL.String(), beatStats) - if err != nil { - return err - } - - for _, name := range beat.Includes { - var stats interface{} - var metric string - - switch name { - case "beat": - stats = beatStats.Beat - metric = "beat" - case "filebeat": - stats = beatStats.FileBeat - metric = "beat_filebeat" - case "system": - stats = beatStats.System - metric = "beat_system" - case "libbeat": - stats = beatStats.Libbeat - metric = "beat_libbeat" - default: - return fmt.Errorf("unknown stats-type %q", name) - } - flattener := parsers_json.JSONFlattener{} - err := flattener.FullFlattenJSON("", stats, true, true) - if err != nil { - return err - } - accumulator.AddFields(metric, flattener.Fields, tags) - } - - return nil } func init() { inputs.Add("beat", func() telegraf.Input { - return NewBeat() + return newBeat() }) } diff --git a/plugins/inputs/beat/beat_test.go b/plugins/inputs/beat/beat_test.go index fae8818f4..c9b9d6071 100644 --- a/plugins/inputs/beat/beat_test.go +++ b/plugins/inputs/beat/beat_test.go @@ -16,7 +16,7 @@ import ( func Test_BeatStats(t *testing.T) { var beat6StatsAccumulator testutil.Accumulator - var beatTest = NewBeat() + var beatTest = newBeat() // System stats are disabled by default beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"} require.NoError(t, beatTest.Init()) @@ -160,7 +160,7 @@ func Test_BeatStats(t *testing.T) { func Test_BeatRequest(t *testing.T) { var beat6StatsAccumulator testutil.Accumulator - beatTest := NewBeat() + beatTest := newBeat() // System stats are disabled by default beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"} require.NoError(t, beatTest.Init()) diff --git a/plugins/inputs/bond/bond.go b/plugins/inputs/bond/bond.go index a7caf1a7d..fb5bded85 100644 --- a/plugins/inputs/bond/bond.go +++ b/plugins/inputs/bond/bond.go @@ -17,13 +17,12 @@ import ( //go:embed sample.conf var sampleConfig string -// default host proc path -const defaultHostProc = "/proc" -const defaultHostSys = "/sys" - -// env host proc variable name -const envProc = "HOST_PROC" -const envSys = "HOST_SYS" +const ( + defaultHostProc = "/proc" + defaultHostSys = "/sys" + envProc = "HOST_PROC" + envSys = "HOST_SYS" +) type Bond struct { HostProc string `toml:"host_proc"` diff --git a/plugins/inputs/burrow/burrow.go b/plugins/inputs/burrow/burrow.go index c58f2f24b..0cdf8a00b 100644 --- a/plugins/inputs/burrow/burrow.go +++ b/plugins/inputs/burrow/burrow.go @@ -31,7 +31,7 @@ const ( ) type ( - burrow struct { + Burrow struct { tls.ClientConfig Servers []string @@ -91,17 +91,11 @@ type ( } ) -func init() { - inputs.Add("burrow", func() telegraf.Input { - return &burrow{} - }) -} - -func (*burrow) SampleConfig() string { +func (*Burrow) SampleConfig() string { return sampleConfig } -func (b *burrow) Gather(acc telegraf.Accumulator) error { +func (b *Burrow) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup if len(b.Servers) == 0 { @@ -141,7 +135,7 @@ func (b *burrow) Gather(acc telegraf.Accumulator) error { return nil } -func (b *burrow) setDefaults() { +func (b *Burrow) setDefaults() { if b.APIPrefix == "" { b.APIPrefix = defaultBurrowPrefix } @@ -153,7 +147,7 @@ func (b *burrow) setDefaults() { } } -func (b *burrow) compileGlobs() error { +func (b *Burrow) compileGlobs() error { var err error // compile glob patterns @@ -172,7 +166,7 @@ func (b *burrow) compileGlobs() error { return nil } -func (b *burrow) createClient() (*http.Client, error) { +func (b *Burrow) createClient() (*http.Client, error) { tlsCfg, err := b.ClientConfig.TLSConfig() if err != nil { return nil, err @@ -199,7 +193,7 @@ func (b *burrow) createClient() (*http.Client, error) { return client, nil } -func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) { +func (b *Burrow) getResponse(u *url.URL) (*apiResponse, error) { req, err := http.NewRequest(http.MethodGet, u.String(), nil) if err != nil { return nil, err @@ -224,7 +218,7 @@ func (b *burrow) getResponse(u *url.URL) (*apiResponse, error) { return ares, dec.Decode(ares) } -func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { +func (b *Burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { var wg sync.WaitGroup r, err := b.getResponse(src) @@ -263,7 +257,7 @@ func (b *burrow) gatherServer(src *url.URL, acc telegraf.Accumulator) error { return nil } -func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { +func (b *Burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { var wg sync.WaitGroup r, err := b.getResponse(src) @@ -302,7 +296,7 @@ func (b *burrow) gatherTopics(guard chan struct{}, src *url.URL, cluster string, wg.Wait() } -func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) { +func (b *Burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc telegraf.Accumulator) { for i, offset := range r.Offsets { tags := map[string]string{ "cluster": cluster, @@ -320,7 +314,7 @@ func (b *burrow) genTopicMetrics(r *apiResponse, cluster, topic string, acc tele } } -func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { +func (b *Burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, acc telegraf.Accumulator) { var wg sync.WaitGroup r, err := b.getResponse(src) @@ -360,7 +354,7 @@ func (b *burrow) gatherGroups(guard chan struct{}, src *url.URL, cluster string, wg.Wait() } -func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { +func (b *Burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { partitionCount := r.Status.PartitionCount if partitionCount == 0 { partitionCount = len(r.Status.Partitions) @@ -399,7 +393,7 @@ func (b *burrow) genGroupStatusMetrics(r *apiResponse, cluster, group string, ac ) } -func (b *burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { +func (b *Burrow) genGroupLagMetrics(r *apiResponse, cluster, group string, acc telegraf.Accumulator) { for _, partition := range r.Status.Partitions { if !b.filterTopics.Match(partition.Topic) { continue @@ -455,3 +449,9 @@ func mapStatusToCode(src string) int { return 0 } } + +func init() { + inputs.Add("burrow", func() telegraf.Input { + return &Burrow{} + }) +} diff --git a/plugins/inputs/burrow/burrow_test.go b/plugins/inputs/burrow/burrow_test.go index 358af7447..aefa78202 100644 --- a/plugins/inputs/burrow/burrow_test.go +++ b/plugins/inputs/burrow/burrow_test.go @@ -73,7 +73,7 @@ func TestBurrowTopic(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{Servers: []string{s.URL}} + plugin := &Burrow{Servers: []string{s.URL}} acc := &testutil.Accumulator{} require.NoError(t, plugin.Gather(acc)) @@ -102,7 +102,7 @@ func TestBurrowPartition(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, } acc := &testutil.Accumulator{} @@ -150,7 +150,7 @@ func TestBurrowGroup(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, } acc := &testutil.Accumulator{} @@ -188,7 +188,7 @@ func TestMultipleServers(t *testing.T) { s2 := getHTTPServer() defer s2.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s1.URL, s2.URL}, } acc := &testutil.Accumulator{} @@ -203,7 +203,7 @@ func TestMultipleRuns(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, } for i := 0; i < 4; i++ { @@ -220,7 +220,7 @@ func TestBasicAuthConfig(t *testing.T) { s := getHTTPServerBasicAuth() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, Username: "test", Password: "test", @@ -238,7 +238,7 @@ func TestFilterClusters(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, ClustersInclude: []string{"wrongname*"}, // clustername1 -> no match } @@ -256,7 +256,7 @@ func TestFilterGroups(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, GroupsInclude: []string{"group?"}, // group1 -> match TopicsExclude: []string{"*"}, // exclude all @@ -274,7 +274,7 @@ func TestFilterTopics(t *testing.T) { s := getHTTPServer() defer s.Close() - plugin := &burrow{ + plugin := &Burrow{ Servers: []string{s.URL}, TopicsInclude: []string{"topic?"}, // topicA -> match GroupsExclude: []string{"*"}, // exclude all