feat(inputs.kube_inventory): Support using kubelet to get pods data (#13996)

This commit is contained in:
Noy-Simon 2023-10-04 17:55:18 +03:00 committed by GitHub
parent 01b5834cb7
commit f8a5a17b47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 13 deletions

View File

@ -52,6 +52,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## If empty in-cluster config with POD's service account token will be used. ## If empty in-cluster config with POD's service account token will be used.
# url = "" # url = ""
## URL for the kubelet, if set it will be used to collect the pods resource metrics
# url_kubelet = "http://127.0.0.1:10255"
## Namespace to use. Set to "" to use all namespaces. ## Namespace to use. Set to "" to use all namespaces.
# namespace = "default" # namespace = "default"

View File

@ -2,6 +2,7 @@ package kube_inventory
import ( import (
"context" "context"
"net/http"
"time" "time"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
@ -12,6 +13,7 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/common/tls"
) )
@ -22,16 +24,16 @@ type client struct {
} }
func newClient(baseURL, namespace, bearerTokenFile string, bearerToken string, timeout time.Duration, tlsConfig tls.ClientConfig) (*client, error) { func newClient(baseURL, namespace, bearerTokenFile string, bearerToken string, timeout time.Duration, tlsConfig tls.ClientConfig) (*client, error) {
var config *rest.Config var clientConfig *rest.Config
var err error var err error
if baseURL == "" { if baseURL == "" {
config, err = rest.InClusterConfig() clientConfig, err = rest.InClusterConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else { } else {
config = &rest.Config{ clientConfig = &rest.Config{
TLSClientConfig: rest.TLSClientConfig{ TLSClientConfig: rest.TLSClientConfig{
ServerName: tlsConfig.ServerName, ServerName: tlsConfig.ServerName,
Insecure: tlsConfig.InsecureSkipVerify, Insecure: tlsConfig.InsecureSkipVerify,
@ -44,13 +46,13 @@ func newClient(baseURL, namespace, bearerTokenFile string, bearerToken string, t
} }
if bearerTokenFile != "" { if bearerTokenFile != "" {
config.BearerTokenFile = bearerTokenFile clientConfig.BearerTokenFile = bearerTokenFile
} else if bearerToken != "" { } else if bearerToken != "" {
config.BearerToken = bearerToken clientConfig.BearerToken = bearerToken
} }
} }
c, err := kubernetes.NewForConfig(config) c, err := kubernetes.NewForConfig(clientConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -62,6 +64,21 @@ func newClient(baseURL, namespace, bearerTokenFile string, bearerToken string, t
}, nil }, nil
} }
func newHTTPClient(tlsConfig tls.ClientConfig, bearerTokenFile string, responseTimeout config.Duration) (*http.Client, error) {
tlsCfg, err := tlsConfig.TLSConfig()
if err != nil {
return nil, err
}
clientConfig := &rest.Config{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
ContentConfig: rest.ContentConfig{},
Timeout: time.Duration(responseTimeout),
BearerTokenFile: bearerTokenFile,
}
return rest.HTTPClientFor(clientConfig)
}
func (c *client) getDaemonSets(ctx context.Context) (*appsv1.DaemonSetList, error) { func (c *client) getDaemonSets(ctx context.Context) (*appsv1.DaemonSetList, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout) ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel() defer cancel()
@ -111,6 +128,7 @@ func (c *client) getPersistentVolumeClaims(ctx context.Context) (*corev1.Persist
func (c *client) getPods(ctx context.Context, nodeName string) (*corev1.PodList, error) { func (c *client) getPods(ctx context.Context, nodeName string) (*corev1.PodList, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout) ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel() defer cancel()
var fieldSelector string var fieldSelector string
if nodeName != "" { if nodeName != "" {
fieldSelector = "spec.nodeName=" + nodeName fieldSelector = "spec.nodeName=" + nodeName

View File

@ -4,7 +4,9 @@ package kube_inventory
import ( import (
"context" "context"
_ "embed" _ "embed"
"encoding/json"
"fmt" "fmt"
"net/http"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -28,6 +30,7 @@ const (
// KubernetesInventory represents the config object for the plugin. // KubernetesInventory represents the config object for the plugin.
type KubernetesInventory struct { type KubernetesInventory struct {
URL string `toml:"url"` URL string `toml:"url"`
KubeletURL string `toml:"url_kubelet"`
BearerToken string `toml:"bearer_token"` BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string" deprecated:"1.24.0;use 'BearerToken' with a file instead"` BearerTokenString string `toml:"bearer_token_string" deprecated:"1.24.0;use 'BearerToken' with a file instead"`
Namespace string `toml:"namespace"` Namespace string `toml:"namespace"`
@ -36,13 +39,15 @@ type KubernetesInventory struct {
ResourceInclude []string `toml:"resource_include"` ResourceInclude []string `toml:"resource_include"`
MaxConfigMapAge config.Duration `toml:"max_config_map_age"` MaxConfigMapAge config.Duration `toml:"max_config_map_age"`
SelectorInclude []string `toml:"selector_include"` SelectorInclude []string `toml:"selector_include"`
SelectorExclude []string `toml:"selector_exclude"` SelectorExclude []string `toml:"selector_exclude"`
NodeName string `toml:"node_name"`
Log telegraf.Logger `toml:"-"` NodeName string `toml:"node_name"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig tls.ClientConfig
client *client client *client
httpClient *http.Client
selectorFilter filter.Filter selectorFilter filter.Filter
} }
@ -67,7 +72,17 @@ func (ki *KubernetesInventory) Init() error {
if err != nil { if err != nil {
return err return err
} }
if ki.ResponseTimeout < config.Duration(time.Second) {
ki.ResponseTimeout = config.Duration(time.Second * 5)
}
// Only create an http client if we have a kubelet url
if ki.KubeletURL != "" {
ki.httpClient, err = newHTTPClient(ki.ClientConfig, ki.BearerToken, ki.ResponseTimeout)
if err != nil {
ki.Log.Warnf("unable to create http client: %v", err)
}
}
return nil return nil
} }
@ -140,6 +155,28 @@ func (ki *KubernetesInventory) convertQuantity(s string, m float64) int64 {
} }
return int64(f * m) return int64(f * m)
} }
func (ki *KubernetesInventory) queryPodsFromKubelet(url string, v interface{}) error {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return fmt.Errorf("creating new http request for url %s failed: %w", url, err)
}
req.Header.Add("Accept", "application/json")
resp, err := ki.httpClient.Do(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %q: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}
if err := json.NewDecoder(resp.Body).Decode(v); err != nil {
return fmt.Errorf("error parsing response: %w", err)
}
return nil
}
func (ki *KubernetesInventory) createSelectorFilters() error { func (ki *KubernetesInventory) createSelectorFilters() error {
selectorFilter, err := filter.NewIncludeExcludeFilter(ki.SelectorInclude, ki.SelectorExclude) selectorFilter, err := filter.NewIncludeExcludeFilter(ki.SelectorInclude, ki.SelectorExclude)

View File

@ -2,6 +2,7 @@ package kube_inventory
import ( import (
"context" "context"
"fmt"
"strings" "strings"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
@ -10,13 +11,21 @@ import (
) )
func collectPods(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { func collectPods(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getPods(ctx, ki.NodeName) var list corev1.PodList
listRef := &list
var err error
if ki.KubeletURL != "" {
err = ki.queryPodsFromKubelet(fmt.Sprintf("%s/pods", ki.KubeletURL), listRef)
} else {
listRef, err = ki.client.getPods(ctx, ki.NodeName)
}
if err != nil { if err != nil {
acc.AddError(err) acc.AddError(err)
return return
} }
for _, p := range list.Items { for _, p := range listRef.Items {
ki.gatherPod(p, acc) ki.gatherPod(p, acc)
} }
} }

View File

@ -4,6 +4,9 @@
## If empty in-cluster config with POD's service account token will be used. ## If empty in-cluster config with POD's service account token will be used.
# url = "" # url = ""
## URL for the kubelet, if set it will be used to collect the pods resource metrics
# url_kubelet = "http://127.0.0.1:10255"
## Namespace to use. Set to "" to use all namespaces. ## Namespace to use. Set to "" to use all namespaces.
# namespace = "default" # namespace = "default"