Prometheus [Input] plugin - Optimizing for bigger kubernetes clusters (500+ pods) when scraping thru 'monitor_kubernetes_pods' (#8762)
This commit is contained in:
parent
aabec054a7
commit
d7df2c546b
|
|
@ -204,6 +204,8 @@ following works:
|
|||
- gopkg.in/tomb.v2 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v2/LICENSE)
|
||||
- gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE)
|
||||
- gopkg.in/yaml.v3 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v3/LICENSE)
|
||||
- k8s.io/apimachinery [Apache License 2.0](https://github.com/kubernetes/apimachinery/blob/master/LICENSE)
|
||||
- k8s.io/klog [Apache License 2.0](https://github.com/kubernetes/klog/blob/master/LICENSE)
|
||||
- modernc.org/libc [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/libc/-/blob/master/LICENSE)
|
||||
- modernc.org/memory [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/memory/-/blob/master/LICENSE)
|
||||
- modernc.org/sqlite [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/sqlite/-/blob/master/LICENSE)
|
||||
|
|
|
|||
|
|
@ -33,6 +33,16 @@ in Prometheus format.
|
|||
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
|
||||
## - prometheus.io/port: If port is not 9102 use this annotation
|
||||
# monitor_kubernetes_pods = true
|
||||
## Get the list of pods to scrape with either the scope of
|
||||
## - cluster: the kubernetes watch api (default), no need to specify
|
||||
## - node: the local cadvisor api; for scalability. Note that the config node_ip or the environment variable NODE_IP must be set to the host IP.
|
||||
# pod_scrape_scope = "cluster"
|
||||
## Only for node scrape scope: node IP of the node that telegraf is running on.
|
||||
## Either this config or the environment variable NODE_IP must be set.
|
||||
# node_ip = "10.180.1.1"
|
||||
## Only for node scrape scope: interval in seconds for how often to get updated pod list for scraping
|
||||
## Default is 60 seconds.
|
||||
# pod_scrape_interval = 60
|
||||
## Restricts Kubernetes monitoring to a single namespace
|
||||
## ex: monitor_kubernetes_pods_namespace = "default"
|
||||
# monitor_kubernetes_pods_namespace = ""
|
||||
|
|
@ -88,6 +98,17 @@ Currently the following annotation are supported:
|
|||
|
||||
Using the `monitor_kubernetes_pods_namespace` option allows you to limit which pods you are scraping.
|
||||
|
||||
Using `pod_scrape_scope = "node"` allows more scalable scraping for pods which will scrape pods only in the node that telegraf is running. It will fetch the pod list locally from the node's kubelet. This will require running Telegraf in every node of the cluster. Note that either `node_ip` must be specified in the config or the environment variable `NODE_IP` must be set to the host IP. ThisThe latter can be done in the yaml of the pod running telegraf:
|
||||
```
|
||||
env:
|
||||
- name: NODE_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.hostIP
|
||||
```
|
||||
|
||||
If using node level scrape scope, `pod_scrape_interval` specifies how often (in seconds) the pod list for scraping should updated. If not specified, the default is 60 seconds.
|
||||
|
||||
#### Bearer Token
|
||||
|
||||
If set, the file specified by the `bearer_token` parameter will be read on
|
||||
|
|
|
|||
|
|
@ -2,10 +2,12 @@ package prometheus
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
|
|
@ -15,6 +17,8 @@ import (
|
|||
"github.com/ericchiang/k8s"
|
||||
corev1 "github.com/ericchiang/k8s/apis/core/v1"
|
||||
"github.com/ghodss/yaml"
|
||||
"github.com/kubernetes/apimachinery/pkg/fields"
|
||||
"github.com/kubernetes/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
type payload struct {
|
||||
|
|
@ -22,6 +26,20 @@ type payload struct {
|
|||
pod *corev1.Pod
|
||||
}
|
||||
|
||||
type podMetadata struct {
|
||||
ResourceVersion string `json:"resourceVersion"`
|
||||
SelfLink string `json:"selfLink"`
|
||||
}
|
||||
|
||||
type podResponse struct {
|
||||
Kind string `json:"kind"`
|
||||
ApiVersion string `json:"apiVersion"`
|
||||
Metadata podMetadata `json:"metadata"`
|
||||
Items []*corev1.Pod `json:"items,string,omitempty"`
|
||||
}
|
||||
|
||||
const cAdvisorPodListDefaultInterval = 60
|
||||
|
||||
// loadClient parses a kubeconfig from a file and returns a Kubernetes
|
||||
// client. It does not support extensions or client auth providers.
|
||||
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
|
||||
|
|
@ -66,9 +84,16 @@ func (p *Prometheus) start(ctx context.Context) error {
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
err := p.watch(ctx, client)
|
||||
if err != nil {
|
||||
p.Log.Errorf("Unable to watch resources: %s", err.Error())
|
||||
if p.isNodeScrapeScope {
|
||||
err = p.cAdvisor(ctx, client)
|
||||
if err != nil {
|
||||
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
|
||||
}
|
||||
} else {
|
||||
err = p.watch(ctx, client)
|
||||
if err != nil {
|
||||
p.Log.Errorf("Unable to watch resources: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -126,6 +151,147 @@ func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *Prometheus) cAdvisor(ctx context.Context, client *k8s.Client) error {
|
||||
// Set InsecureSkipVerify for cAdvisor client since Node IP will not be a SAN for the CA cert
|
||||
tlsConfig := client.Client.Transport.(*http.Transport).TLSClientConfig
|
||||
tlsConfig.InsecureSkipVerify = true
|
||||
|
||||
// The request will be the same each time
|
||||
podsUrl := fmt.Sprintf("https://%s:10250/pods", p.NodeIP)
|
||||
req, err := http.NewRequest("GET", podsUrl, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error when creating request to %s to get pod list: %w", podsUrl, err)
|
||||
}
|
||||
client.SetHeaders(req.Header)
|
||||
|
||||
// Update right away so code is not waiting the length of the specified scrape interval initially
|
||||
err = updateCadvisorPodList(ctx, p, client, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error initially updating pod list: %w", err)
|
||||
}
|
||||
|
||||
scrapeInterval := cAdvisorPodListDefaultInterval
|
||||
if p.PodScrapeInterval != 0 {
|
||||
scrapeInterval = p.PodScrapeInterval
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-time.After(time.Duration(scrapeInterval) * time.Second):
|
||||
err := updateCadvisorPodList(ctx, p, client, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error updating pod list: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func updateCadvisorPodList(ctx context.Context, p *Prometheus, client *k8s.Client, req *http.Request) error {
|
||||
|
||||
resp, err := client.Client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error when making request for pod list: %w", err)
|
||||
}
|
||||
|
||||
// If err is nil, still check response code
|
||||
if resp.StatusCode != 200 {
|
||||
return fmt.Errorf("Error when making request for pod list with status %s", resp.Status)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
cadvisorPodsResponse := podResponse{}
|
||||
|
||||
// Will have expected type errors for some parts of corev1.Pod struct for some unused fields
|
||||
// Instead have nil checks for every used field in case of incorrect decoding
|
||||
json.NewDecoder(resp.Body).Decode(&cadvisorPodsResponse)
|
||||
pods := cadvisorPodsResponse.Items
|
||||
|
||||
// Updating pod list to be latest cadvisor response
|
||||
p.lock.Lock()
|
||||
p.kubernetesPods = make(map[string]URLAndAddress)
|
||||
|
||||
// Register pod only if it has an annotation to scrape, if it is ready,
|
||||
// and if namespace and selectors are specified and match
|
||||
for _, pod := range pods {
|
||||
if necessaryPodFieldsArePresent(pod) &&
|
||||
pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] == "true" &&
|
||||
podReady(pod.GetStatus().GetContainerStatuses()) &&
|
||||
podHasMatchingNamespace(pod, p) &&
|
||||
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
|
||||
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
|
||||
registerPod(pod, p)
|
||||
}
|
||||
|
||||
}
|
||||
p.lock.Unlock()
|
||||
|
||||
// No errors
|
||||
return nil
|
||||
}
|
||||
|
||||
func necessaryPodFieldsArePresent(pod *corev1.Pod) bool {
|
||||
return pod.GetMetadata() != nil &&
|
||||
pod.GetMetadata().GetAnnotations() != nil &&
|
||||
pod.GetMetadata().GetLabels() != nil &&
|
||||
pod.GetSpec() != nil &&
|
||||
pod.GetStatus() != nil &&
|
||||
pod.GetStatus().GetContainerStatuses() != nil
|
||||
}
|
||||
|
||||
/* See the docs on kubernetes label selectors:
|
||||
* https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
|
||||
*/
|
||||
func podHasMatchingLabelSelector(pod *corev1.Pod, labelSelector labels.Selector) bool {
|
||||
if labelSelector == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
var labelsSet labels.Set = pod.GetMetadata().GetLabels()
|
||||
return labelSelector.Matches(labelsSet)
|
||||
}
|
||||
|
||||
/* See ToSelectableFields() for list of fields that are selectable:
|
||||
* https://github.com/kubernetes/kubernetes/release-1.20/pkg/registry/core/pod/strategy.go
|
||||
* See docs on kubernetes field selectors:
|
||||
* https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
|
||||
*/
|
||||
func podHasMatchingFieldSelector(pod *corev1.Pod, fieldSelector fields.Selector) bool {
|
||||
if fieldSelector == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
podSpec := pod.GetSpec()
|
||||
podStatus := pod.GetStatus()
|
||||
|
||||
// Spec and Status shouldn't be nil.
|
||||
// Error handling just in case something goes wrong but won't crash telegraf
|
||||
if podSpec == nil || podStatus == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
fieldsSet := make(fields.Set)
|
||||
fieldsSet["spec.nodeName"] = podSpec.GetNodeName()
|
||||
fieldsSet["spec.restartPolicy"] = podSpec.GetRestartPolicy()
|
||||
fieldsSet["spec.schedulerName"] = podSpec.GetSchedulerName()
|
||||
fieldsSet["spec.serviceAccountName"] = podSpec.GetServiceAccountName()
|
||||
fieldsSet["status.phase"] = podStatus.GetPhase()
|
||||
fieldsSet["status.podIP"] = podStatus.GetPodIP()
|
||||
fieldsSet["status.nominatedNodeName"] = podStatus.GetNominatedNodeName()
|
||||
|
||||
return fieldSelector.Matches(fieldsSet)
|
||||
}
|
||||
|
||||
/*
|
||||
* If a namespace is specified and the pod doesn't have that namespace, return false
|
||||
* Else return true
|
||||
*/
|
||||
func podHasMatchingNamespace(pod *corev1.Pod, p *Prometheus) bool {
|
||||
return !(p.PodNamespace != "" && pod.GetMetadata().GetNamespace() != p.PodNamespace)
|
||||
}
|
||||
|
||||
func podReady(statuss []*corev1.ContainerStatus) bool {
|
||||
if len(statuss) == 0 {
|
||||
return false
|
||||
|
|
@ -180,14 +346,19 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
|
|||
return
|
||||
}
|
||||
podURL := p.AddressToURL(URL, URL.Hostname())
|
||||
p.lock.Lock()
|
||||
|
||||
// Locks earlier if using cAdvisor calls - makes a new list each time
|
||||
// rather than updating and removing from the same list
|
||||
if !p.isNodeScrapeScope {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
}
|
||||
p.kubernetesPods[podURL.String()] = URLAndAddress{
|
||||
URL: podURL,
|
||||
Address: URL.Hostname(),
|
||||
OriginalURL: URL,
|
||||
Tags: tags,
|
||||
}
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
func getScrapeURL(pod *corev1.Pod) *string {
|
||||
|
|
|
|||
|
|
@ -9,6 +9,9 @@ import (
|
|||
|
||||
v1 "github.com/ericchiang/k8s/apis/core/v1"
|
||||
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
|
||||
|
||||
"github.com/kubernetes/apimachinery/pkg/fields"
|
||||
"github.com/kubernetes/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
func TestScrapeURLNoAnnotations(t *testing.T) {
|
||||
|
|
@ -142,6 +145,62 @@ func TestPodSelector(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPodHasMatchingNamespace(t *testing.T) {
|
||||
prom := &Prometheus{Log: testutil.Logger{}, PodNamespace: "default"}
|
||||
|
||||
pod := pod()
|
||||
pod.Metadata.Name = str("Pod1")
|
||||
pod.Metadata.Namespace = str("default")
|
||||
shouldMatch := podHasMatchingNamespace(pod, prom)
|
||||
assert.Equal(t, true, shouldMatch)
|
||||
|
||||
pod.Metadata.Name = str("Pod2")
|
||||
pod.Metadata.Namespace = str("namespace")
|
||||
shouldNotMatch := podHasMatchingNamespace(pod, prom)
|
||||
assert.Equal(t, false, shouldNotMatch)
|
||||
}
|
||||
|
||||
func TestPodHasMatchingLabelSelector(t *testing.T) {
|
||||
labelSelectorString := "label0==label0,label1=label1,label2!=label,label3 in (label1,label2, label3),label4 notin (label1, label2,label3),label5,!label6"
|
||||
prom := &Prometheus{Log: testutil.Logger{}, KubernetesLabelSelector: labelSelectorString}
|
||||
|
||||
pod := pod()
|
||||
pod.Metadata.Labels = make(map[string]string)
|
||||
pod.Metadata.Labels["label0"] = "label0"
|
||||
pod.Metadata.Labels["label1"] = "label1"
|
||||
pod.Metadata.Labels["label2"] = "label2"
|
||||
pod.Metadata.Labels["label3"] = "label3"
|
||||
pod.Metadata.Labels["label4"] = "label4"
|
||||
pod.Metadata.Labels["label5"] = "label5"
|
||||
|
||||
labelSelector, err := labels.Parse(prom.KubernetesLabelSelector)
|
||||
assert.Equal(t, err, nil)
|
||||
assert.Equal(t, true, podHasMatchingLabelSelector(pod, labelSelector))
|
||||
}
|
||||
|
||||
func TestPodHasMatchingFieldSelector(t *testing.T) {
|
||||
fieldSelectorString := "status.podIP=127.0.0.1,spec.restartPolicy=Always,spec.NodeName!=nodeName"
|
||||
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
|
||||
pod := pod()
|
||||
pod.Spec.RestartPolicy = str("Always")
|
||||
pod.Spec.NodeName = str("node1000")
|
||||
|
||||
fieldSelector, err := fields.ParseSelector(prom.KubernetesFieldSelector)
|
||||
assert.Equal(t, err, nil)
|
||||
assert.Equal(t, true, podHasMatchingFieldSelector(pod, fieldSelector))
|
||||
}
|
||||
|
||||
func TestInvalidFieldSelector(t *testing.T) {
|
||||
fieldSelectorString := "status.podIP=127.0.0.1,spec.restartPolicy=Always,spec.NodeName!=nodeName,spec.nodeName"
|
||||
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
|
||||
pod := pod()
|
||||
pod.Spec.RestartPolicy = str("Always")
|
||||
pod.Spec.NodeName = str("node1000")
|
||||
|
||||
_, err := fields.ParseSelector(prom.KubernetesFieldSelector)
|
||||
assert.NotEqual(t, err, nil)
|
||||
}
|
||||
|
||||
func pod() *v1.Pod {
|
||||
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}, Spec: &v1.PodSpec{}}
|
||||
p.Status.PodIP = str("127.0.0.1")
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -16,6 +18,8 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
parser_v2 "github.com/influxdata/telegraf/plugins/parsers/prometheus"
|
||||
"github.com/kubernetes/apimachinery/pkg/fields"
|
||||
"github.com/kubernetes/apimachinery/pkg/labels"
|
||||
)
|
||||
|
||||
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
|
||||
|
|
@ -57,12 +61,21 @@ type Prometheus struct {
|
|||
client *http.Client
|
||||
|
||||
// Should we scrape Kubernetes services for prometheus annotations
|
||||
MonitorPods bool `toml:"monitor_kubernetes_pods"`
|
||||
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
|
||||
lock sync.Mutex
|
||||
kubernetesPods map[string]URLAndAddress
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
MonitorPods bool `toml:"monitor_kubernetes_pods"`
|
||||
PodScrapeScope string `toml:"pod_scrape_scope"`
|
||||
NodeIP string `toml:"node_ip"`
|
||||
PodScrapeInterval int `toml:"pod_scrape_interval"`
|
||||
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
|
||||
lock sync.Mutex
|
||||
kubernetesPods map[string]URLAndAddress
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Only for monitor_kubernetes_pods=true and pod_scrape_scope="node"
|
||||
podLabelSelector labels.Selector
|
||||
podFieldSelector fields.Selector
|
||||
nodeIP string
|
||||
isNodeScrapeScope bool
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
|
|
@ -94,6 +107,16 @@ var sampleConfig = `
|
|||
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
|
||||
## - prometheus.io/port: If port is not 9102 use this annotation
|
||||
# monitor_kubernetes_pods = true
|
||||
## Get the list of pods to scrape with either the scope of
|
||||
## - cluster: the kubernetes watch api (default, no need to specify)
|
||||
## - node: the local cadvisor api; for scalability. Note that the config node_ip or the environment variable NODE_IP must be set to the host IP.
|
||||
# pod_scrape_scope = "cluster"
|
||||
## Only for node scrape scope: node IP of the node that telegraf is running on.
|
||||
## Either this config or the environment variable NODE_IP must be set.
|
||||
# node_ip = "10.180.1.1"
|
||||
## Only for node scrape scope: interval in seconds for how often to get updated pod list for scraping.
|
||||
## Default is 60 seconds.
|
||||
# pod_scrape_interval = 60
|
||||
## Restricts Kubernetes monitoring to a single namespace
|
||||
## ex: monitor_kubernetes_pods_namespace = "default"
|
||||
# monitor_kubernetes_pods_namespace = ""
|
||||
|
|
@ -137,6 +160,43 @@ func (p *Prometheus) Init() error {
|
|||
p.Log.Warnf("Use of deprecated configuration: 'metric_version = 1'; please update to 'metric_version = 2'")
|
||||
}
|
||||
|
||||
// Config proccessing for node scrape scope for monitor_kubernetes_pods
|
||||
p.isNodeScrapeScope = strings.EqualFold(p.PodScrapeScope, "node")
|
||||
if p.isNodeScrapeScope {
|
||||
|
||||
// Need node IP to make cAdvisor call for pod list. Check if set in config and valid IP address
|
||||
if p.NodeIP == "" || net.ParseIP(p.NodeIP) == nil {
|
||||
p.Log.Infof("The config node_ip is empty or invalid. Using NODE_IP env var as default.")
|
||||
|
||||
// Check if set as env var and is valid IP address
|
||||
envVarNodeIP := os.Getenv("NODE_IP")
|
||||
if envVarNodeIP == "" || net.ParseIP(envVarNodeIP) == nil {
|
||||
errorMessage := "The node_ip config and the environment variable NODE_IP are not set or invalid. Cannot get pod list for monitor_kubernetes_pods using node scrape scope"
|
||||
return errors.New(errorMessage)
|
||||
}
|
||||
|
||||
p.NodeIP = envVarNodeIP
|
||||
}
|
||||
|
||||
// Parse label and field selectors - will be used to filter pods after cAdvisor call
|
||||
var err error
|
||||
p.podLabelSelector, err = labels.Parse(p.KubernetesLabelSelector)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error parsing the specified label selector(s): %s", err.Error())
|
||||
}
|
||||
p.podFieldSelector, err = fields.ParseSelector(p.KubernetesFieldSelector)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error parsing the specified field selector(s): %s", err.Error())
|
||||
}
|
||||
isValid, invalidSelector := fieldSelectorIsSupported(p.podFieldSelector)
|
||||
if !isValid {
|
||||
return fmt.Errorf("The field selector %s is not supported for pods", invalidSelector)
|
||||
}
|
||||
|
||||
p.Log.Infof("Using pod scrape scope at node level to get pod list using cAdvisor.")
|
||||
p.Log.Infof("Using the label selector: %v and field selector: %v", p.podLabelSelector, p.podFieldSelector)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -372,6 +432,30 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
|
|||
return nil
|
||||
}
|
||||
|
||||
/* Check if the field selector specified is valid.
|
||||
* See ToSelectableFields() for list of fields that are selectable:
|
||||
* https://github.com/kubernetes/kubernetes/release-1.20/pkg/registry/core/pod/strategy.go
|
||||
*/
|
||||
func fieldSelectorIsSupported(fieldSelector fields.Selector) (bool, string) {
|
||||
supportedFieldsToSelect := map[string]bool{
|
||||
"spec.nodeName": true,
|
||||
"spec.restartPolicy": true,
|
||||
"spec.schedulerName": true,
|
||||
"spec.serviceAccountName": true,
|
||||
"status.phase": true,
|
||||
"status.podIP": true,
|
||||
"status.nominatedNodeName": true,
|
||||
}
|
||||
|
||||
for _, requirement := range fieldSelector.Requirements() {
|
||||
if !supportedFieldsToSelect[requirement.Field] {
|
||||
return false, requirement.Field
|
||||
}
|
||||
}
|
||||
|
||||
return true, ""
|
||||
}
|
||||
|
||||
// Start will start the Kubernetes scraping if enabled in the configuration
|
||||
func (p *Prometheus) Start(a telegraf.Accumulator) error {
|
||||
if p.MonitorPods {
|
||||
|
|
|
|||
|
|
@ -6,11 +6,13 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/kubernetes/apimachinery/pkg/fields"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
@ -234,3 +236,49 @@ func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) {
|
|||
assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics")
|
||||
assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0)))
|
||||
}
|
||||
|
||||
func TestUnsupportedFieldSelector(t *testing.T) {
|
||||
fieldSelectorString := "spec.containerName=container"
|
||||
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
|
||||
|
||||
fieldSelector, _ := fields.ParseSelector(prom.KubernetesFieldSelector)
|
||||
isValid, invalidSelector := fieldSelectorIsSupported(fieldSelector)
|
||||
assert.Equal(t, false, isValid)
|
||||
assert.Equal(t, "spec.containerName", invalidSelector)
|
||||
}
|
||||
|
||||
func TestInitConfigErrors(t *testing.T) {
|
||||
p := &Prometheus{
|
||||
MetricVersion: 2,
|
||||
Log: testutil.Logger{},
|
||||
URLs: nil,
|
||||
URLTag: "url",
|
||||
MonitorPods: true,
|
||||
PodScrapeScope: "node",
|
||||
PodScrapeInterval: 60,
|
||||
}
|
||||
|
||||
// Both invalid IP addresses
|
||||
p.NodeIP = "10.240.0.0.0"
|
||||
os.Setenv("NODE_IP", "10.000.0.0.0")
|
||||
err := p.Init()
|
||||
expectedMessage := "The node_ip config and the environment variable NODE_IP are not set or invalid. Cannot get pod list for monitor_kubernetes_pods using node scrape scope"
|
||||
assert.Equal(t, expectedMessage, err.Error())
|
||||
os.Setenv("NODE_IP", "10.000.0.0")
|
||||
|
||||
p.KubernetesLabelSelector = "label0==label0, label0 in (=)"
|
||||
err = p.Init()
|
||||
expectedMessage = "Error parsing the specified label selector(s): unable to parse requirement: found '=', expected: ',', ')' or identifier"
|
||||
assert.Equal(t, expectedMessage, err.Error())
|
||||
p.KubernetesLabelSelector = "label0==label"
|
||||
|
||||
p.KubernetesFieldSelector = "field,"
|
||||
err = p.Init()
|
||||
expectedMessage = "Error parsing the specified field selector(s): invalid selector: 'field,'; can't understand 'field'"
|
||||
assert.Equal(t, expectedMessage, err.Error())
|
||||
|
||||
p.KubernetesFieldSelector = "spec.containerNames=containerNames"
|
||||
err = p.Init()
|
||||
expectedMessage = "The field selector spec.containerNames is not supported for pods"
|
||||
assert.Equal(t, expectedMessage, err.Error())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue