fix(inputs.prometheus): moved from watcher to informer (#10932)

This commit is contained in:
Shubham Kumar Singh Rajput 2022-04-08 19:39:02 +05:30 committed by GitHub
parent 8e2b4988fe
commit 777f8bf715
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 43 deletions

View File

@ -63,6 +63,10 @@ in Prometheus format.
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60
## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true

View File

@ -18,9 +18,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
type podMetadata struct {
@ -89,10 +91,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
}
} else {
err = p.watchPod(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
}
p.watchPod(ctx, client)
}
}
}
@ -105,48 +104,114 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not
// directed to do so by K8s.
func (p *Prometheus) watchPod(ctx context.Context, client *kubernetes.Clientset) error {
watcher, err := client.CoreV1().Pods(p.PodNamespace).Watch(ctx, metav1.ListOptions{
LabelSelector: p.KubernetesLabelSelector,
FieldSelector: p.KubernetesFieldSelector,
})
if err != nil {
return err
func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clientset) {
var resyncinterval time.Duration
if p.CacheRefreshInterval != 0 {
resyncinterval = time.Duration(p.CacheRefreshInterval) * time.Minute
} else {
resyncinterval = 60 * time.Minute
}
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return nil
default:
for event := range watcher.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("Unexpected object when getting pods")
}
informerfactory := informers.NewSharedInformerFactory(clientset, resyncinterval)
// If the pod is not "ready", there will be no ip associated with it.
if pod.Annotations["prometheus.io/scrape"] != "true" ||
!podReady(pod.Status.ContainerStatuses) {
continue
}
podinformer := informerfactory.Core().V1().Pods()
podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}
switch event.Type {
case watch.Added:
registerPod(pod, p)
case watch.Modified:
// To avoid multiple actions for each event, unregister on the first event
// in the delete sequence, when the containers are still "ready".
if pod.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} else {
registerPod(pod, p)
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}
pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if pod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(pod.Status.ContainerStatuses) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
registerPod(pod, p)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
newKey, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}
newNamespace, newName, err := cache.SplitMetaNamespaceKey(newKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}
newPod, _ := clientset.CoreV1().Pods(newNamespace).Get(ctx, newName, metav1.GetOptions{})
if newPod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(newPod.Status.ContainerStatuses) &&
podHasMatchingNamespace(newPod, p) &&
podHasMatchingLabelSelector(newPod, p.podLabelSelector) &&
podHasMatchingFieldSelector(newPod, p.podFieldSelector) {
if newPod.GetDeletionTimestamp() == nil {
registerPod(newPod, p)
}
}
}
}
oldKey, err := cache.MetaNamespaceKeyFunc(oldObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}
oldNamespace, oldName, err := cache.SplitMetaNamespaceKey(oldKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}
oldPod, _ := clientset.CoreV1().Pods(oldNamespace).Get(ctx, oldName, metav1.GetOptions{})
if oldPod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(oldPod.Status.ContainerStatuses) &&
podHasMatchingNamespace(oldPod, p) &&
podHasMatchingLabelSelector(oldPod, p.podLabelSelector) &&
podHasMatchingFieldSelector(oldPod, p.podFieldSelector) {
if oldPod.GetDeletionTimestamp() != nil {
unregisterPod(oldPod, p)
}
}
},
DeleteFunc: func(oldObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(oldObj)
if err != nil {
p.Log.Errorf("getting key from cache %s", err.Error())
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}
pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if pod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(pod.Status.ContainerStatuses) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
if pod.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
}
}
},
})
informerfactory.Start(wait.NeverStop)
informerfactory.WaitForCacheSync(wait.NeverStop)
<-ctx.Done()
}
func (p *Prometheus) cAdvisor(ctx context.Context, bearerToken string) error {
@ -372,11 +437,10 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) {
return
}
p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace)
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.kubernetesPods[targetURL.String()]; ok {
p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace)
delete(p.kubernetesPods, targetURL.String())
p.Log.Debugf("will stop scraping for %q", targetURL.String())
}

View File

@ -84,6 +84,9 @@ type Prometheus struct {
podFieldSelector fields.Selector
isNodeScrapeScope bool
// Only for monitor_kubernetes_pods=true
CacheRefreshInterval int `toml:"cache_refresh_interval"`
// List of consul services to scrape
consulServices map[string]URLAndAddress
}
@ -140,6 +143,10 @@ var sampleConfig = `
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60
## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true