fix: decode Prometheus scrape path from Kuberentes labels (#9662)

This commit is contained in:
Alexander Krantz 2021-10-18 14:47:45 -07:00 committed by GitHub
parent d729c0a6e4
commit 2e216825db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 40 deletions

View File

@ -5,7 +5,6 @@ import (
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@ -295,12 +294,15 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil { if p.kubernetesPods == nil {
p.kubernetesPods = map[string]URLAndAddress{} p.kubernetesPods = map[string]URLAndAddress{}
} }
targetURL := getScrapeURL(pod) targetURL, err := getScrapeURL(pod)
if targetURL == nil { if err != nil {
p.Log.Errorf("could not parse URL: %s", err)
return
} else if targetURL == nil {
return return
} }
log.Printf("D! [inputs.prometheus] will scrape metrics from %q", *targetURL) p.Log.Debugf("will scrape metrics from %q", targetURL.String())
// add annotation as metrics tags // add annotation as metrics tags
tags := pod.Annotations tags := pod.Annotations
if tags == nil { if tags == nil {
@ -312,12 +314,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
for k, v := range pod.Labels { for k, v := range pod.Labels {
tags[k] = v tags[k] = v
} }
URL, err := url.Parse(*targetURL) podURL := p.AddressToURL(targetURL, targetURL.Hostname())
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %q: %s", *targetURL, err.Error())
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
// Locks earlier if using cAdvisor calls - makes a new list each time // Locks earlier if using cAdvisor calls - makes a new list each time
// rather than updating and removing from the same list // rather than updating and removing from the same list
@ -327,22 +324,22 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
} }
p.kubernetesPods[podURL.String()] = URLAndAddress{ p.kubernetesPods[podURL.String()] = URLAndAddress{
URL: podURL, URL: podURL,
Address: URL.Hostname(), Address: targetURL.Hostname(),
OriginalURL: URL, OriginalURL: targetURL,
Tags: tags, Tags: tags,
} }
} }
func getScrapeURL(pod *corev1.Pod) *string { func getScrapeURL(pod *corev1.Pod) (*url.URL, error) {
ip := pod.Status.PodIP ip := pod.Status.PodIP
if ip == "" { if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod // return as if scrape was disabled, we will be notified again once the pod
// has an IP // has an IP
return nil return nil, nil
} }
scheme := pod.Annotations["prometheus.io/scheme"] scheme := pod.Annotations["prometheus.io/scheme"]
path := pod.Annotations["prometheus.io/path"] pathAndQuery := pod.Annotations["prometheus.io/path"]
port := pod.Annotations["prometheus.io/port"] port := pod.Annotations["prometheus.io/port"]
if scheme == "" { if scheme == "" {
@ -351,34 +348,36 @@ func getScrapeURL(pod *corev1.Pod) *string {
if port == "" { if port == "" {
port = "9102" port = "9102"
} }
if path == "" { if pathAndQuery == "" {
path = "/metrics" pathAndQuery = "/metrics"
} }
u := &url.URL{ base, err := url.Parse(pathAndQuery)
Scheme: scheme, if err != nil {
Host: net.JoinHostPort(ip, port), return nil, err
Path: path,
} }
x := u.String() base.Scheme = scheme
base.Host = net.JoinHostPort(ip, port)
return &x return base, nil
} }
func unregisterPod(pod *corev1.Pod, p *Prometheus) { func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := getScrapeURL(pod) targetURL, err := getScrapeURL(pod)
if url == nil { if err != nil {
p.Log.Errorf("failed to parse url: %s", err)
return
} else if targetURL == nil {
return return
} }
log.Printf("D! [inputs.prometheus] registered a delete request for %q in namespace %q", p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace)
pod.Name, pod.Namespace)
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
if _, ok := p.kubernetesPods[*url]; ok { if _, ok := p.kubernetesPods[targetURL.String()]; ok {
delete(p.kubernetesPods, *url) delete(p.kubernetesPods, targetURL.String())
log.Printf("D! [inputs.prometheus] will stop scraping for %q", *url) p.Log.Debugf("will stop scraping for %q", targetURL.String())
} }
} }

View File

@ -15,7 +15,8 @@ import (
func TestScrapeURLNoAnnotations(t *testing.T) { func TestScrapeURLNoAnnotations(t *testing.T) {
p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}} p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}}
p.Annotations = map[string]string{} p.Annotations = map[string]string{}
url := getScrapeURL(p) url, err := getScrapeURL(p)
assert.NoError(t, err)
assert.Nil(t, url) assert.Nil(t, url)
} }
@ -23,36 +24,57 @@ func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}} p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}}
p.Name = "myPod" p.Name = "myPod"
p.Annotations = map[string]string{"prometheus.io/scrape": "false"} p.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := getScrapeURL(p) url, err := getScrapeURL(p)
assert.NoError(t, err)
assert.Nil(t, url) assert.Nil(t, url)
} }
func TestScrapeURLAnnotations(t *testing.T) { func TestScrapeURLAnnotations(t *testing.T) {
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p) url, err := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) assert.NoError(t, err)
assert.Equal(t, "http://127.0.0.1:9102/metrics", url.String())
} }
func TestScrapeURLAnnotationsCustomPort(t *testing.T) { func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p) url, err := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) assert.NoError(t, err)
assert.Equal(t, "http://127.0.0.1:9000/metrics", url.String())
} }
func TestScrapeURLAnnotationsCustomPath(t *testing.T) { func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := getScrapeURL(p) url, err := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) assert.NoError(t, err)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", url.String())
} }
func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod() p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := getScrapeURL(p) url, err := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) assert.NoError(t, err)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", url.String())
}
func TestScrapeURLAnnotationsCustomPathWithQueryParameters(t *testing.T) {
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/v1/agent/metrics?format=prometheus"}
url, err := getScrapeURL(p)
assert.NoError(t, err)
assert.Equal(t, "http://127.0.0.1:9102/v1/agent/metrics?format=prometheus", url.String())
}
func TestScrapeURLAnnotationsCustomPathWithFragment(t *testing.T) {
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/v1/agent/metrics#prometheus"}
url, err := getScrapeURL(p)
assert.NoError(t, err)
assert.Equal(t, "http://127.0.0.1:9102/v1/agent/metrics#prometheus", url.String())
} }
func TestAddPod(t *testing.T) { func TestAddPod(t *testing.T) {