fix(inputs.prometheus): correctly track deleted pods (#12522)

This commit is contained in:
Maxim Ivanov 2023-01-23 15:13:51 +00:00 committed by GitHub
parent 410226051d
commit 51f23d244e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 99 deletions

View File

@ -14,7 +14,6 @@ import (
"time" "time"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -98,7 +97,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
} }
func shouldScrapePod(pod *corev1.Pod, p *Prometheus) bool { func shouldScrapePod(pod *corev1.Pod, p *Prometheus) bool {
isCandidate := podReady(pod.Status.ContainerStatuses) && isCandidate := podReady(pod) &&
podHasMatchingNamespace(pod, p) && podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) && podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) podHasMatchingFieldSelector(pod, p.podFieldSelector)
@ -116,6 +115,9 @@ func shouldScrapePod(pod *corev1.Pod, p *Prometheus) bool {
return isCandidate && shouldScrape return isCandidate && shouldScrape
} }
// Share informer across all instances of this plugin
var informerfactory informers.SharedInformerFactory
// An edge case exists if a pod goes offline at the same time a new pod is created // An edge case exists if a pod goes offline at the same time a new pod is created
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape // (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not // pod, causing errors in the logs. This is only true if the pod going offline is not
@ -129,86 +131,55 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients
resyncinterval = 60 * time.Minute resyncinterval = 60 * time.Minute
} }
informerfactory := informers.NewSharedInformerFactory(clientset, resyncinterval) if informerfactory == nil {
informerfactory = informers.NewSharedInformerFactory(clientset, resyncinterval)
}
podinformer := informerfactory.Core().V1().Pods() podinformer := informerfactory.Core().V1().Pods()
podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(newObj interface{}) { AddFunc: func(newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj) newPod, ok := newObj.(*corev1.Pod)
if err != nil { if !ok {
p.Log.Errorf("getting key from cache %s\n", err.Error()) p.Log.Errorf("[BUG] received unexpected object: %v", newObj)
return
} }
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}
pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if shouldScrapePod(pod, p) {
registerPod(pod, p)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
newKey, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}
newNamespace, newName, err := cache.SplitMetaNamespaceKey(newKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}
newPod, _ := clientset.CoreV1().Pods(newNamespace).Get(ctx, newName, metav1.GetOptions{})
if shouldScrapePod(newPod, p) { if shouldScrapePod(newPod, p) {
if newPod.GetDeletionTimestamp() == nil { registerPod(newPod, p)
registerPod(newPod, p)
}
}
oldKey, err := cache.MetaNamespaceKeyFunc(oldObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}
oldNamespace, oldName, err := cache.SplitMetaNamespaceKey(oldKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}
oldPod, _ := clientset.CoreV1().Pods(oldNamespace).Get(ctx, oldName, metav1.GetOptions{})
if shouldScrapePod(oldPod, p) {
if oldPod.GetDeletionTimestamp() != nil {
unregisterPod(oldPod, p)
}
} }
}, },
DeleteFunc: func(oldObj interface{}) { // On Pod status updates and regular reList by Informer
key, err := cache.MetaNamespaceKeyFunc(oldObj) UpdateFunc: func(_, newObj interface{}) {
newPod, ok := newObj.(*corev1.Pod)
if !ok {
p.Log.Errorf("[BUG] received unexpected object: %v", newObj)
return
}
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj)
if err != nil { if err != nil {
p.Log.Errorf("getting key from cache %s", err.Error()) p.Log.Errorf("getting key from cache %s", err.Error())
} }
podID := PodID(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key) if shouldScrapePod(newPod, p) {
if err != nil { // When Informers re-Lists, pod might already be registered,
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) // do nothing if it is, register otherwise
} if _, ok = p.kubernetesPods[podID]; !ok {
registerPod(newPod, p)
pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if shouldScrapePod(pod, p) {
if pod.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} }
} else {
// Pods are largely immutable, but it's readiness status can change, unregister then
unregisterPod(podID, p)
}
},
DeleteFunc: func(oldObj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(oldObj)
if err == nil {
unregisterPod(PodID(key), p)
} }
}, },
}) })
informerfactory.Start(wait.NeverStop) informerfactory.Start(ctx.Done())
informerfactory.WaitForCacheSync(wait.NeverStop) informerfactory.WaitForCacheSync(wait.NeverStop)
<-ctx.Done() <-ctx.Done()
@ -275,7 +246,7 @@ func updateCadvisorPodList(p *Prometheus, req *http.Request) error {
// Updating pod list to be latest cadvisor response // Updating pod list to be latest cadvisor response
p.lock.Lock() p.lock.Lock()
p.kubernetesPods = make(map[string]URLAndAddress) p.kubernetesPods = make(map[PodID]URLAndAddress)
// Register pod only if it has an annotation to scrape, if it is ready, // Register pod only if it has an annotation to scrape, if it is ready,
// and if namespace and selectors are specified and match // and if namespace and selectors are specified and match
@ -338,21 +309,18 @@ func podHasMatchingNamespace(pod *corev1.Pod, p *Prometheus) bool {
return !(p.PodNamespace != "" && pod.Namespace != p.PodNamespace) return !(p.PodNamespace != "" && pod.Namespace != p.PodNamespace)
} }
func podReady(statuss []corev1.ContainerStatus) bool { func podReady(pod *corev1.Pod) bool {
if len(statuss) == 0 { for _, cond := range pod.Status.Conditions {
return false if cond.Type == corev1.PodReady {
} return true
for _, cs := range statuss {
if !cs.Ready {
return false
} }
} }
return true return false
} }
func registerPod(pod *corev1.Pod, p *Prometheus) { func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil { if p.kubernetesPods == nil {
p.kubernetesPods = map[string]URLAndAddress{} p.kubernetesPods = map[PodID]URLAndAddress{}
} }
targetURL, err := getScrapeURL(pod, p) targetURL, err := getScrapeURL(pod, p)
if err != nil { if err != nil {
@ -388,7 +356,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
} }
p.kubernetesPods[podURL.String()] = URLAndAddress{ p.kubernetesPods[PodID(pod.GetNamespace()+"/"+pod.GetName())] = URLAndAddress{
URL: podURL, URL: podURL,
Address: targetURL.Hostname(), Address: targetURL.Hostname(),
OriginalURL: targetURL, OriginalURL: targetURL,
@ -449,20 +417,12 @@ func getScrapeURL(pod *corev1.Pod, p *Prometheus) (*url.URL, error) {
return base, nil return base, nil
} }
func unregisterPod(pod *corev1.Pod, p *Prometheus) { func unregisterPod(podID PodID, p *Prometheus) {
targetURL, err := getScrapeURL(pod, p)
if err != nil {
p.Log.Errorf("failed to parse url: %s", err)
return
} else if targetURL == nil {
return
}
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
if _, ok := p.kubernetesPods[targetURL.String()]; ok { if v, ok := p.kubernetesPods[podID]; ok {
p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace) p.Log.Debugf("registered a delete request for %s", podID)
delete(p.kubernetesPods, targetURL.String()) delete(p.kubernetesPods, podID)
p.Log.Debugf("will stop scraping for %q", targetURL.String()) p.Log.Debugf("will stop scraping for %q", v.URL.String())
} }
} }

View File

@ -1,6 +1,7 @@
package prometheus package prometheus
import ( import (
"k8s.io/client-go/tools/cache"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -153,7 +154,9 @@ func TestAddMultipleDuplicatePods(t *testing.T) {
registerPod(p, prom) registerPod(p, prom)
p.Name = "Pod2" p.Name = "Pod2"
registerPod(p, prom) registerPod(p, prom)
require.Equal(t, 1, len(prom.kubernetesPods))
urls, _ := prom.GetAllURLs()
require.Equal(t, 1, len(urls))
} }
func TestAddMultiplePods(t *testing.T) { func TestAddMultiplePods(t *testing.T) {
@ -174,7 +177,9 @@ func TestDeletePods(t *testing.T) {
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom) registerPod(p, prom)
unregisterPod(p, prom)
podID, _ := cache.MetaNamespaceKeyFunc(p)
unregisterPod(PodID(podID), prom)
require.Equal(t, 0, len(prom.kubernetesPods)) require.Equal(t, 0, len(prom.kubernetesPods))
} }
@ -184,7 +189,9 @@ func TestKeepDefaultNamespaceLabelName(t *testing.T) {
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom) registerPod(p, prom)
tags := prom.kubernetesPods["http://127.0.0.1:9102/metrics"].Tags
podID, _ := cache.MetaNamespaceKeyFunc(p)
tags := prom.kubernetesPods[PodID(podID)].Tags
require.Equal(t, "default", tags["namespace"]) require.Equal(t, "default", tags["namespace"])
} }
@ -194,7 +201,9 @@ func TestChangeNamespaceLabelName(t *testing.T) {
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom) registerPod(p, prom)
tags := prom.kubernetesPods["http://127.0.0.1:9102/metrics"].Tags
podID, _ := cache.MetaNamespaceKeyFunc(p)
tags := prom.kubernetesPods[PodID(podID)].Tags
require.Equal(t, "default", tags["pod_namespace"]) require.Equal(t, "default", tags["pod_namespace"])
require.Equal(t, "", tags["namespace"]) require.Equal(t, "", tags["namespace"])
} }

View File

@ -40,6 +40,8 @@ const (
MonitorMethodSettingsAndAnnotations MonitorMethod = "settings+annotations" MonitorMethodSettingsAndAnnotations MonitorMethod = "settings+annotations"
) )
type PodID string
type Prometheus struct { type Prometheus struct {
// An array of urls to scrape metrics from. // An array of urls to scrape metrics from.
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
@ -92,7 +94,7 @@ type Prometheus struct {
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"` PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
PodNamespaceLabelName string `toml:"pod_namespace_label_name"` PodNamespaceLabelName string `toml:"pod_namespace_label_name"`
lock sync.Mutex lock sync.Mutex
kubernetesPods map[string]URLAndAddress kubernetesPods map[PodID]URLAndAddress
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
@ -201,7 +203,7 @@ type URLAndAddress struct {
} }
func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
allURLs := make(map[string]URLAndAddress) allURLs := make(map[string]URLAndAddress, len(p.URLs)+len(p.consulServices)+len(p.kubernetesPods))
for _, u := range p.URLs { for _, u := range p.URLs {
address, err := url.Parse(u) address, err := url.Parse(u)
if err != nil { if err != nil {
@ -218,8 +220,8 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
allURLs[k] = v allURLs[k] = v
} }
// loop through all pods scraped via the prometheus annotation on the pods // loop through all pods scraped via the prometheus annotation on the pods
for k, v := range p.kubernetesPods { for _, v := range p.kubernetesPods {
allURLs[k] = v allURLs[v.URL.String()] = v
} }
for _, service := range p.KubernetesServices { for _, service := range p.KubernetesServices {
@ -452,7 +454,7 @@ func init() {
inputs.Add("prometheus", func() telegraf.Input { inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{ return &Prometheus{
ResponseTimeout: config.Duration(time.Second * 3), ResponseTimeout: config.Duration(time.Second * 3),
kubernetesPods: map[string]URLAndAddress{}, kubernetesPods: map[PodID]URLAndAddress{},
consulServices: map[string]URLAndAddress{}, consulServices: map[string]URLAndAddress{},
URLTag: "url", URLTag: "url",
} }