diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index c8cff40df..cac333353 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -14,7 +14,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" @@ -98,7 +97,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error { } func shouldScrapePod(pod *corev1.Pod, p *Prometheus) bool { - isCandidate := podReady(pod.Status.ContainerStatuses) && + isCandidate := podReady(pod) && podHasMatchingNamespace(pod, p) && podHasMatchingLabelSelector(pod, p.podLabelSelector) && podHasMatchingFieldSelector(pod, p.podFieldSelector) @@ -116,6 +115,9 @@ func shouldScrapePod(pod *corev1.Pod, p *Prometheus) bool { return isCandidate && shouldScrape } +// Share informer across all instances of this plugin +var informerfactory informers.SharedInformerFactory + // An edge case exists if a pod goes offline at the same time a new pod is created // (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape // pod, causing errors in the logs. This is only true if the pod going offline is not @@ -129,86 +131,55 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients resyncinterval = 60 * time.Minute } - informerfactory := informers.NewSharedInformerFactory(clientset, resyncinterval) + if informerfactory == nil { + informerfactory = informers.NewSharedInformerFactory(clientset, resyncinterval) + } podinformer := informerfactory.Core().V1().Pods() podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(newObj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(newObj) - if err != nil { - p.Log.Errorf("getting key from cache %s\n", err.Error()) + newPod, ok := newObj.(*corev1.Pod) + if !ok { + p.Log.Errorf("[BUG] received unexpected object: %v", newObj) + return } - - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) - } - - pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) - - if shouldScrapePod(pod, p) { - registerPod(pod, p) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - newKey, err := cache.MetaNamespaceKeyFunc(newObj) - if err != nil { - p.Log.Errorf("getting key from cache %s\n", err.Error()) - } - - newNamespace, newName, err := cache.SplitMetaNamespaceKey(newKey) - if err != nil { - p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) - } - - newPod, _ := clientset.CoreV1().Pods(newNamespace).Get(ctx, newName, metav1.GetOptions{}) - if shouldScrapePod(newPod, p) { - if newPod.GetDeletionTimestamp() == nil { - registerPod(newPod, p) - } - } - - oldKey, err := cache.MetaNamespaceKeyFunc(oldObj) - if err != nil { - p.Log.Errorf("getting key from cache %s\n", err.Error()) - } - - oldNamespace, oldName, err := cache.SplitMetaNamespaceKey(oldKey) - if err != nil { - p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) - } - - oldPod, _ := clientset.CoreV1().Pods(oldNamespace).Get(ctx, oldName, metav1.GetOptions{}) - - if shouldScrapePod(oldPod, p) { - if oldPod.GetDeletionTimestamp() != nil { - unregisterPod(oldPod, p) - } + registerPod(newPod, p) } }, - DeleteFunc: func(oldObj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(oldObj) + // On Pod status updates and regular reList by Informer + UpdateFunc: func(_, newObj interface{}) { + newPod, ok := newObj.(*corev1.Pod) + if !ok { + p.Log.Errorf("[BUG] received unexpected object: %v", newObj) + return + } + + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj) if err != nil { p.Log.Errorf("getting key from cache %s", err.Error()) } - - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) - } - - pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) - - if shouldScrapePod(pod, p) { - if pod.GetDeletionTimestamp() != nil { - unregisterPod(pod, p) + podID := PodID(key) + if shouldScrapePod(newPod, p) { + // When Informers re-Lists, pod might already be registered, + // do nothing if it is, register otherwise + if _, ok = p.kubernetesPods[podID]; !ok { + registerPod(newPod, p) } + } else { + // Pods are largely immutable, but it's readiness status can change, unregister then + unregisterPod(podID, p) + } + }, + DeleteFunc: func(oldObj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(oldObj) + if err == nil { + unregisterPod(PodID(key), p) } }, }) - informerfactory.Start(wait.NeverStop) + informerfactory.Start(ctx.Done()) informerfactory.WaitForCacheSync(wait.NeverStop) <-ctx.Done() @@ -275,7 +246,7 @@ func updateCadvisorPodList(p *Prometheus, req *http.Request) error { // Updating pod list to be latest cadvisor response p.lock.Lock() - p.kubernetesPods = make(map[string]URLAndAddress) + p.kubernetesPods = make(map[PodID]URLAndAddress) // Register pod only if it has an annotation to scrape, if it is ready, // and if namespace and selectors are specified and match @@ -338,21 +309,18 @@ func podHasMatchingNamespace(pod *corev1.Pod, p *Prometheus) bool { return !(p.PodNamespace != "" && pod.Namespace != p.PodNamespace) } -func podReady(statuss []corev1.ContainerStatus) bool { - if len(statuss) == 0 { - return false - } - for _, cs := range statuss { - if !cs.Ready { - return false +func podReady(pod *corev1.Pod) bool { + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady { + return true } } - return true + return false } func registerPod(pod *corev1.Pod, p *Prometheus) { if p.kubernetesPods == nil { - p.kubernetesPods = map[string]URLAndAddress{} + p.kubernetesPods = map[PodID]URLAndAddress{} } targetURL, err := getScrapeURL(pod, p) if err != nil { @@ -388,7 +356,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { p.lock.Lock() defer p.lock.Unlock() } - p.kubernetesPods[podURL.String()] = URLAndAddress{ + p.kubernetesPods[PodID(pod.GetNamespace()+"/"+pod.GetName())] = URLAndAddress{ URL: podURL, Address: targetURL.Hostname(), OriginalURL: targetURL, @@ -449,20 +417,12 @@ func getScrapeURL(pod *corev1.Pod, p *Prometheus) (*url.URL, error) { return base, nil } -func unregisterPod(pod *corev1.Pod, p *Prometheus) { - targetURL, err := getScrapeURL(pod, p) - if err != nil { - p.Log.Errorf("failed to parse url: %s", err) - return - } else if targetURL == nil { - return - } - +func unregisterPod(podID PodID, p *Prometheus) { p.lock.Lock() defer p.lock.Unlock() - if _, ok := p.kubernetesPods[targetURL.String()]; ok { - p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace) - delete(p.kubernetesPods, targetURL.String()) - p.Log.Debugf("will stop scraping for %q", targetURL.String()) + if v, ok := p.kubernetesPods[podID]; ok { + p.Log.Debugf("registered a delete request for %s", podID) + delete(p.kubernetesPods, podID) + p.Log.Debugf("will stop scraping for %q", v.URL.String()) } } diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 65fffc7b2..b7624e6e3 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -1,6 +1,7 @@ package prometheus import ( + "k8s.io/client-go/tools/cache" "testing" "github.com/stretchr/testify/require" @@ -153,7 +154,9 @@ func TestAddMultipleDuplicatePods(t *testing.T) { registerPod(p, prom) p.Name = "Pod2" registerPod(p, prom) - require.Equal(t, 1, len(prom.kubernetesPods)) + + urls, _ := prom.GetAllURLs() + require.Equal(t, 1, len(urls)) } func TestAddMultiplePods(t *testing.T) { @@ -174,7 +177,9 @@ func TestDeletePods(t *testing.T) { p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - unregisterPod(p, prom) + + podID, _ := cache.MetaNamespaceKeyFunc(p) + unregisterPod(PodID(podID), prom) require.Equal(t, 0, len(prom.kubernetesPods)) } @@ -184,7 +189,9 @@ func TestKeepDefaultNamespaceLabelName(t *testing.T) { p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - tags := prom.kubernetesPods["http://127.0.0.1:9102/metrics"].Tags + + podID, _ := cache.MetaNamespaceKeyFunc(p) + tags := prom.kubernetesPods[PodID(podID)].Tags require.Equal(t, "default", tags["namespace"]) } @@ -194,7 +201,9 @@ func TestChangeNamespaceLabelName(t *testing.T) { p := pod() p.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - tags := prom.kubernetesPods["http://127.0.0.1:9102/metrics"].Tags + + podID, _ := cache.MetaNamespaceKeyFunc(p) + tags := prom.kubernetesPods[PodID(podID)].Tags require.Equal(t, "default", tags["pod_namespace"]) require.Equal(t, "", tags["namespace"]) } diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 28f92b608..6c8cd34be 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -40,6 +40,8 @@ const ( MonitorMethodSettingsAndAnnotations MonitorMethod = "settings+annotations" ) +type PodID string + type Prometheus struct { // An array of urls to scrape metrics from. URLs []string `toml:"urls"` @@ -92,7 +94,7 @@ type Prometheus struct { PodNamespace string `toml:"monitor_kubernetes_pods_namespace"` PodNamespaceLabelName string `toml:"pod_namespace_label_name"` lock sync.Mutex - kubernetesPods map[string]URLAndAddress + kubernetesPods map[PodID]URLAndAddress cancel context.CancelFunc wg sync.WaitGroup @@ -201,7 +203,7 @@ type URLAndAddress struct { } func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { - allURLs := make(map[string]URLAndAddress) + allURLs := make(map[string]URLAndAddress, len(p.URLs)+len(p.consulServices)+len(p.kubernetesPods)) for _, u := range p.URLs { address, err := url.Parse(u) if err != nil { @@ -218,8 +220,8 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { allURLs[k] = v } // loop through all pods scraped via the prometheus annotation on the pods - for k, v := range p.kubernetesPods { - allURLs[k] = v + for _, v := range p.kubernetesPods { + allURLs[v.URL.String()] = v } for _, service := range p.KubernetesServices { @@ -452,7 +454,7 @@ func init() { inputs.Add("prometheus", func() telegraf.Input { return &Prometheus{ ResponseTimeout: config.Duration(time.Second * 3), - kubernetesPods: map[string]URLAndAddress{}, + kubernetesPods: map[PodID]URLAndAddress{}, consulServices: map[string]URLAndAddress{}, URLTag: "url", }