feat(inputs.prometheus): Control which pod metadata is added as tags (#12851)
This commit is contained in:
parent
789a49858e
commit
5fdeae1ab1
|
|
@ -161,14 +161,7 @@ func (f *Filter) shouldNamePass(key string) bool {
|
|||
// shouldFieldPass returns true if the metric should pass, false if it should drop
|
||||
// based on the drop/pass filter parameters
|
||||
func (f *Filter) shouldFieldPass(key string) bool {
|
||||
if f.fieldPassFilter != nil && f.fieldDropFilter != nil {
|
||||
return f.fieldPassFilter.Match(key) && !f.fieldDropFilter.Match(key)
|
||||
} else if f.fieldPassFilter != nil {
|
||||
return f.fieldPassFilter.Match(key)
|
||||
} else if f.fieldDropFilter != nil {
|
||||
return !f.fieldDropFilter.Match(key)
|
||||
}
|
||||
return true
|
||||
return ShouldPassFilters(f.fieldPassFilter, f.fieldDropFilter, key)
|
||||
}
|
||||
|
||||
// shouldTagsPass returns true if the metric should pass, false if it should drop
|
||||
|
|
@ -217,6 +210,17 @@ func (f *Filter) filterTags(metric telegraf.Metric) {
|
|||
}
|
||||
}
|
||||
|
||||
func ShouldPassFilters(include filter.Filter, exclude filter.Filter, key string) bool {
|
||||
if include != nil && exclude != nil {
|
||||
return include.Match(key) && !exclude.Match(key)
|
||||
} else if include != nil {
|
||||
return include.Match(key)
|
||||
} else if exclude != nil {
|
||||
return !exclude.Match(key)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func ShouldTagsPass(passFilters []TagFilter, dropFilters []TagFilter, tags []*telegraf.Tag) bool {
|
||||
pass := func(tpf []TagFilter) bool {
|
||||
for _, pat := range tpf {
|
||||
|
|
|
|||
|
|
@ -95,6 +95,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
# eg. To scrape pods on a specific node
|
||||
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
|
||||
|
||||
## Filter which pod annotations and labels will be added to metric tags
|
||||
#
|
||||
# pod_annotation_include = ["annotation-key-1"]
|
||||
# pod_annotation_exclude = ["exclude-me"]
|
||||
# pod_label_include = ["label-key-1"]
|
||||
# pod_label_exclude = ["exclude-me"]
|
||||
|
||||
# cache refresh interval to set the interval for re-sync of pods list.
|
||||
# Default is 60 minutes.
|
||||
# cache_refresh_interval = 60
|
||||
|
|
|
|||
|
|
@ -5,8 +5,6 @@ import (
|
|||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
|
@ -15,6 +13,9 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
|
|
@ -369,10 +370,13 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
|
|||
}
|
||||
|
||||
p.Log.Debugf("will scrape metrics from %q", targetURL.String())
|
||||
// add annotation as metrics tags
|
||||
tags := pod.Annotations
|
||||
if tags == nil {
|
||||
tags = map[string]string{}
|
||||
tags := map[string]string{}
|
||||
|
||||
// add annotation as metrics tags, subject to include/exclude filters
|
||||
for k, v := range pod.Annotations {
|
||||
if models.ShouldPassFilters(p.podAnnotationIncludeFilter, p.podAnnotationExcludeFilter, k) {
|
||||
tags[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
tags["pod_name"] = pod.Name
|
||||
|
|
@ -382,9 +386,11 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
|
|||
}
|
||||
tags[podNamespace] = pod.Namespace
|
||||
|
||||
// add labels as metrics tags
|
||||
// add labels as metrics tags, subject to include/exclude filters
|
||||
for k, v := range pod.Labels {
|
||||
tags[k] = v
|
||||
if models.ShouldPassFilters(p.podLabelIncludeFilter, p.podLabelExcludeFilter, k) {
|
||||
tags[k] = v
|
||||
}
|
||||
}
|
||||
podURL := p.AddressToURL(targetURL, targetURL.Hostname())
|
||||
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"testing"
|
||||
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
|
@ -264,6 +265,96 @@ func TestInvalidFieldSelector(t *testing.T) {
|
|||
require.NotEqual(t, err, nil)
|
||||
}
|
||||
|
||||
func TestAnnotationFilters(t *testing.T) {
|
||||
p := pod()
|
||||
p.Annotations = map[string]string{
|
||||
"prometheus.io/scrape": "true",
|
||||
"includeme": "true",
|
||||
"excludeme": "true",
|
||||
"neutral": "true",
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
include []string
|
||||
exclude []string
|
||||
expectedTags []string
|
||||
}{
|
||||
{"Just include",
|
||||
[]string{"includeme"},
|
||||
nil,
|
||||
[]string{"includeme"}},
|
||||
{"Just exclude",
|
||||
nil,
|
||||
[]string{"excludeme"},
|
||||
[]string{"includeme", "neutral"}},
|
||||
{"Include & exclude",
|
||||
[]string{"includeme"},
|
||||
[]string{"exludeme"},
|
||||
[]string{"includeme"}},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
prom := &Prometheus{Log: testutil.Logger{}}
|
||||
prom.PodAnnotationInclude = tc.include
|
||||
prom.PodAnnotationExclude = tc.exclude
|
||||
require.NoError(t, prom.initFilters())
|
||||
registerPod(p, prom)
|
||||
for _, pd := range prom.kubernetesPods {
|
||||
for _, tagKey := range tc.expectedTags {
|
||||
require.Contains(t, pd.Tags, tagKey)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLabelFilters(t *testing.T) {
|
||||
p := pod()
|
||||
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
|
||||
p.Labels = map[string]string{
|
||||
"includeme": "true",
|
||||
"excludeme": "true",
|
||||
"neutral": "true",
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
include []string
|
||||
exclude []string
|
||||
expectedTags []string
|
||||
}{
|
||||
{"Just include",
|
||||
[]string{"includeme"},
|
||||
nil,
|
||||
[]string{"includeme"}},
|
||||
{"Just exclude",
|
||||
nil,
|
||||
[]string{"excludeme"},
|
||||
[]string{"includeme", "neutral"}},
|
||||
{"Include & exclude",
|
||||
[]string{"includeme"},
|
||||
[]string{"exludeme"},
|
||||
[]string{"includeme"}},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
prom := &Prometheus{Log: testutil.Logger{}}
|
||||
prom.PodLabelInclude = tc.include
|
||||
prom.PodLabelExclude = tc.exclude
|
||||
require.NoError(t, prom.initFilters())
|
||||
registerPod(p, prom)
|
||||
for _, pd := range prom.kubernetesPods {
|
||||
for _, tagKey := range tc.expectedTags {
|
||||
require.Contains(t, pd.Tags, tagKey)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func pod() *corev1.Pod {
|
||||
p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}, Status: corev1.PodStatus{}, Spec: corev1.PodSpec{}}
|
||||
p.Status.PodIP = "127.0.0.1"
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
|
|
@ -119,6 +120,17 @@ type Prometheus struct {
|
|||
NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"`
|
||||
NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"`
|
||||
|
||||
PodAnnotationInclude []string `toml:"pod_annotation_include"`
|
||||
PodAnnotationExclude []string `toml:"pod_annotation_exclude"`
|
||||
|
||||
PodLabelInclude []string `toml:"pod_label_include"`
|
||||
PodLabelExclude []string `toml:"pod_label_exclude"`
|
||||
|
||||
podAnnotationIncludeFilter filter.Filter
|
||||
podAnnotationExcludeFilter filter.Filter
|
||||
podLabelIncludeFilter filter.Filter
|
||||
podLabelExcludeFilter filter.Filter
|
||||
|
||||
// Only for monitor_kubernetes_pods=true
|
||||
CacheRefreshInterval int `toml:"cache_refresh_interval"`
|
||||
|
||||
|
|
@ -193,6 +205,10 @@ func (p *Prometheus) Init() error {
|
|||
p.nsAnnotationDrop = append(p.nsAnnotationDrop, tagFilter)
|
||||
}
|
||||
|
||||
if err := p.initFilters(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if p.ResponseTimeout != 0 {
|
||||
p.HTTPClientConfig.Timeout = p.ResponseTimeout
|
||||
|
|
@ -211,6 +227,38 @@ func (p *Prometheus) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *Prometheus) initFilters() error {
|
||||
if p.PodAnnotationExclude != nil {
|
||||
podAnnotationExclude, err := filter.Compile(p.PodAnnotationExclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'pod_annotation_exclude': %w", err)
|
||||
}
|
||||
p.podAnnotationExcludeFilter = podAnnotationExclude
|
||||
}
|
||||
if p.PodAnnotationInclude != nil {
|
||||
podAnnotationInclude, err := filter.Compile(p.PodAnnotationInclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'pod_annotation_include': %w", err)
|
||||
}
|
||||
p.podAnnotationIncludeFilter = podAnnotationInclude
|
||||
}
|
||||
if p.PodLabelExclude != nil {
|
||||
podLabelExclude, err := filter.Compile(p.PodLabelExclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'pod_label_exclude': %w", err)
|
||||
}
|
||||
p.podLabelExcludeFilter = podLabelExclude
|
||||
}
|
||||
if p.PodLabelInclude != nil {
|
||||
podLabelInclude, err := filter.Compile(p.PodLabelInclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'pod_label_include': %w", err)
|
||||
}
|
||||
p.podLabelIncludeFilter = podLabelInclude
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
|
||||
host := address
|
||||
if u.Port() != "" {
|
||||
|
|
|
|||
|
|
@ -78,6 +78,13 @@
|
|||
# eg. To scrape pods on a specific node
|
||||
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
|
||||
|
||||
## Filter which pod annotations and labels will be added to metric tags
|
||||
#
|
||||
# pod_annotation_include = ["annotation-key-1"]
|
||||
# pod_annotation_exclude = ["exclude-me"]
|
||||
# pod_label_include = ["label-key-1"]
|
||||
# pod_label_exclude = ["exclude-me"]
|
||||
|
||||
# cache refresh interval to set the interval for re-sync of pods list.
|
||||
# Default is 60 minutes.
|
||||
# cache_refresh_interval = 60
|
||||
|
|
|
|||
Loading…
Reference in New Issue