feat(inputs.kubernetes): Allow fetching kublet metrics remotely (#12301)

Co-authored-by: Maxim Ivanov <hi@yamlcoder.me>
This commit is contained in:
Maxim Ivanov 2022-12-09 15:56:24 +00:00 committed by GitHub
parent 4161651fed
commit df3b23de3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 3 deletions

View File

@ -48,7 +48,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Read metrics from the kubernetes kubelet api
[[inputs.kubernetes]]
## URL for the kubelet
## URL for the kubelet, if empty read metrics from all nodes in the cluster
url = "http://127.0.0.1:10255"
## Use bearer token for authorization. ('bearer_token' takes priority)

View File

@ -2,12 +2,18 @@
package kubernetes
import (
"context"
_ "embed"
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
@ -38,6 +44,8 @@ type Kubernetes struct {
tls.ClientConfig
Log telegraf.Logger `toml:"-"`
RoundTripper http.RoundTripper
}
@ -70,15 +78,83 @@ func (k *Kubernetes) Init() error {
}
k.labelFilter = labelFilter
if k.URL == "" {
k.InsecureSkipVerify = true
}
return nil
}
// Gather collects kubernetes metrics from a given URL
func (k *Kubernetes) Gather(acc telegraf.Accumulator) error {
acc.AddError(k.gatherSummary(k.URL, acc))
if k.URL != "" {
acc.AddError(k.gatherSummary(k.URL, acc))
return nil
}
var wg sync.WaitGroup
nodeBaseURLs, err := getNodeURLs(k.Log)
if err != nil {
return err
}
for _, url := range nodeBaseURLs {
wg.Add(1)
go func(url string) {
defer wg.Done()
acc.AddError(k.gatherSummary(url, acc))
}(url)
}
wg.Wait()
return nil
}
func getNodeURLs(log telegraf.Logger) ([]string, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
nodes, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return nil, err
}
nodeUrls := make([]string, 0, len(nodes.Items))
for _, n := range nodes.Items {
address := getNodeAddress(n)
if address == "" {
log.Warn("Unable to node addresses for Node '%s'", n.Name)
continue
}
nodeUrls = append(nodeUrls, "https://"+address+":10250")
}
return nodeUrls, nil
}
// Prefer internal addresses, if none found, use ExternalIP
func getNodeAddress(node v1.Node) string {
extAddresses := make([]string, 0)
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP {
return addr.Address
}
extAddresses = append(extAddresses, addr.Address)
}
if len(extAddresses) > 0 {
return extAddresses[0]
}
return ""
}
func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
summaryMetrics := &SummaryMetrics{}
err := k.LoadJSON(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics)

View File

@ -1,6 +1,6 @@
# Read metrics from the kubernetes kubelet api
[[inputs.kubernetes]]
## URL for the kubelet
## URL for the kubelet, if empty read metrics from all nodes in the cluster
url = "http://127.0.0.1:10255"
## Use bearer token for authorization. ('bearer_token' takes priority)