fix(inputs.prometheus): Avoid race when creating informer factory (#13231)

This commit is contained in:
Maxim Ivanov 2023-05-08 14:25:32 +00:00 committed by GitHub
parent 1bcc279d68
commit 5cb928cc5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 17 deletions

View File

@ -76,6 +76,13 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
} }
} }
if !p.isNodeScrapeScope {
err = p.watchPod(ctx, client)
if err != nil {
p.Log.Warnf("Error while attempting to watch pod: %s", err.Error())
}
}
p.wg.Add(1) p.wg.Add(1)
go func() { go func() {
defer p.wg.Done() defer p.wg.Done()
@ -90,10 +97,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error()) p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
} }
} else { } else {
err = p.watchPod(ctx, client) <-ctx.Done()
if err != nil {
p.Log.Warnf("Error while attempting to watch pod: %s", err.Error())
}
} }
} }
} }
@ -193,8 +197,6 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients
informerfactory.Start(ctx.Done()) informerfactory.Start(ctx.Done())
informerfactory.WaitForCacheSync(wait.NeverStop) informerfactory.WaitForCacheSync(wait.NeverStop)
<-ctx.Done()
return err return err
} }
@ -366,9 +368,6 @@ func podReady(pod *corev1.Pod) bool {
} }
func registerPod(pod *corev1.Pod, p *Prometheus) { func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil {
p.kubernetesPods = map[PodID]URLAndAddress{}
}
targetURL, err := getScrapeURL(pod, p) targetURL, err := getScrapeURL(pod, p)
if err != nil { if err != nil {
p.Log.Errorf("could not parse URL: %s", err) p.Log.Errorf("could not parse URL: %s", err)

View File

@ -20,6 +20,7 @@ func initPrometheus() *Prometheus {
prom.MonitorKubernetesPodsPort = 9102 prom.MonitorKubernetesPodsPort = 9102
prom.MonitorKubernetesPodsPath = "/metrics" prom.MonitorKubernetesPodsPath = "/metrics"
prom.MonitorKubernetesPodsMethod = MonitorMethodAnnotations prom.MonitorKubernetesPodsMethod = MonitorMethodAnnotations
prom.kubernetesPods = map[PodID]URLAndAddress{}
return prom return prom
} }
@ -129,7 +130,7 @@ func TestScrapeURLAnnotationsCustomPathWithFragment(t *testing.T) {
} }
func TestAddPod(t *testing.T) { func TestAddPod(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}} prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -148,7 +149,7 @@ func TestAddPodScrapeConfig(t *testing.T) {
} }
func TestAddMultipleDuplicatePods(t *testing.T) { func TestAddMultipleDuplicatePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}} prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -161,7 +162,7 @@ func TestAddMultipleDuplicatePods(t *testing.T) {
} }
func TestAddMultiplePods(t *testing.T) { func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}} prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -173,7 +174,7 @@ func TestAddMultiplePods(t *testing.T) {
} }
func TestDeletePods(t *testing.T) { func TestDeletePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}} prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -185,7 +186,7 @@ func TestDeletePods(t *testing.T) {
} }
func TestKeepDefaultNamespaceLabelName(t *testing.T) { func TestKeepDefaultNamespaceLabelName(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}} prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -197,7 +198,7 @@ func TestKeepDefaultNamespaceLabelName(t *testing.T) {
} }
func TestChangeNamespaceLabelName(t *testing.T) { func TestChangeNamespaceLabelName(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace"} prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace", kubernetesPods: map[PodID]URLAndAddress{}}
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -296,7 +297,7 @@ func TestAnnotationFilters(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}} prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom.PodAnnotationInclude = tc.include prom.PodAnnotationInclude = tc.include
prom.PodAnnotationExclude = tc.exclude prom.PodAnnotationExclude = tc.exclude
require.NoError(t, prom.initFilters()) require.NoError(t, prom.initFilters())
@ -341,7 +342,7 @@ func TestLabelFilters(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}} prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom.PodLabelInclude = tc.include prom.PodLabelInclude = tc.include
prom.PodLabelExclude = tc.exclude prom.PodLabelExclude = tc.exclude
require.NoError(t, prom.initFilters()) require.NoError(t, prom.initFilters())

View File

@ -224,6 +224,8 @@ func (p *Prometheus) Init() error {
"Accept": acceptHeader, "Accept": acceptHeader,
} }
p.kubernetesPods = map[PodID]URLAndAddress{}
return nil return nil
} }