feat(inputs.prometheus): Use namespace annotations to filter pods to be scraped (#12777)

This commit is contained in:
Maxim Ivanov 2023-03-07 08:26:38 +00:00 committed by GitHub
parent 5cc55fa900
commit ee682539e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 153 additions and 53 deletions

View File

@ -14,6 +14,15 @@ type TagFilter struct {
filter filter.Filter
}
func (tf *TagFilter) Compile() error {
f, err := filter.Compile(tf.Values)
if err != nil {
return err
}
tf.filter = f
return nil
}
// Filter containing drop/pass and tagdrop/tagpass rules
type Filter struct {
NameDrop []string
@ -79,15 +88,14 @@ func (f *Filter) Compile() error {
return fmt.Errorf("error compiling 'taginclude', %w", err)
}
for i := range f.TagDropFilters {
f.TagDropFilters[i].filter, err = filter.Compile(f.TagDropFilters[i].Values)
if err != nil {
for i := 0; i < len(f.TagDropFilters); i++ {
if err := f.TagDropFilters[i].Compile(); err != nil {
return fmt.Errorf("error compiling 'tagdrop', %w", err)
}
}
for i := range f.TagPassFilters {
f.TagPassFilters[i].filter, err = filter.Compile(f.TagPassFilters[i].Values)
if err != nil {
for i := 0; i < len(f.TagPassFilters); i++ {
if err := f.TagPassFilters[i].Compile(); err != nil {
return fmt.Errorf("error compiling 'tagpass', %w", err)
}
}
@ -166,51 +174,7 @@ func (f *Filter) shouldFieldPass(key string) bool {
// shouldTagsPass returns true if the metric should pass, false if it should drop
// based on the tagdrop/tagpass filter parameters
func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool {
pass := func(f *Filter) bool {
for _, pat := range f.TagPassFilters {
if pat.filter == nil {
continue
}
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return true
}
}
}
}
return false
}
drop := func(f *Filter) bool {
for _, pat := range f.TagDropFilters {
if pat.filter == nil {
continue
}
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return false
}
}
}
}
return true
}
// Add additional logic in case where both parameters are set.
// see: https://github.com/influxdata/telegraf/issues/2860
if f.TagPassFilters != nil && f.TagDropFilters != nil {
// return true only in case when tag pass and won't be dropped (true, true).
// in case when the same tag should be passed and dropped it will be dropped (true, false).
return pass(f) && drop(f)
} else if f.TagPassFilters != nil {
return pass(f)
} else if f.TagDropFilters != nil {
return drop(f)
}
return true
return ShouldTagsPass(f.TagPassFilters, f.TagDropFilters, tags)
}
// filterFields removes fields according to fieldpass/fielddrop.
@ -252,3 +216,51 @@ func (f *Filter) filterTags(metric telegraf.Metric) {
metric.RemoveTag(key)
}
}
func ShouldTagsPass(passFilters []TagFilter, dropFilters []TagFilter, tags []*telegraf.Tag) bool {
pass := func(tpf []TagFilter) bool {
for _, pat := range tpf {
if pat.filter == nil {
continue
}
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return true
}
}
}
}
return false
}
drop := func(tdf []TagFilter) bool {
for _, pat := range tdf {
if pat.filter == nil {
continue
}
for _, tag := range tags {
if tag.Key == pat.Name {
if pat.filter.Match(tag.Value) {
return false
}
}
}
}
return true
}
// Add additional logic in case where both parameters are set.
// see: https://github.com/influxdata/telegraf/issues/2860
if passFilters != nil && dropFilters != nil {
// return true only in case when tag pass and won't be dropped (true, true).
// in case when the same tag should be passed and dropped it will be dropped (true, false).
return pass(passFilters) && drop(dropFilters)
} else if passFilters != nil {
return pass(passFilters)
} else if dropFilters != nil {
return drop(dropFilters)
}
return true
}

View File

@ -153,6 +153,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Set to true/false to enforce TLS being enabled/disabled. If not set,
## enable TLS only if any of the other options are specified.
# tls_enable = true
## Control pod scraping based on pod namespace annotations
## Pass and drop here act like tagpass and tagdrop, but instead
## of filtering metrics they filters pod candidates for scraping
#[inputs.prometheus.namespace_annotation_pass]
# annotation_key = ["value1", "value2"]
#[inputs.prometheus.namespace_annotation_drop]
# some_annotation_key = ["dont-scrape"]
```
`urls` can contain a unix socket as well. If a different path is required

View File

@ -5,6 +5,8 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"net"
"net/http"
"net/url"
@ -135,6 +137,8 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients
informerfactory = informers.NewSharedInformerFactory(clientset, resyncinterval)
}
p.nsStore = informerfactory.Core().V1().Namespaces().Informer().GetStore()
podinformer := informerfactory.Core().V1().Pods()
podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(newObj interface{}) {
@ -301,12 +305,46 @@ func podHasMatchingFieldSelector(pod *corev1.Pod, fieldSelector fields.Selector)
return fieldSelector.Matches(fieldsSet)
}
// Get corev1.Namespace object by name
func getNamespaceObject(name string, p *Prometheus) *corev1.Namespace {
if p.nsStore == nil { // can happen in tests
return nil
}
nsObj, exists, err := p.nsStore.GetByKey(name)
if err != nil {
p.Log.Errorf("Err fetching namespace '%s': %v", name, err)
return nil
} else if !exists {
return nil // can't happen
}
ns, ok := nsObj.(*corev1.Namespace)
if !ok {
p.Log.Errorf("[BUG] received unexpected object: %v", nsObj)
return nil
}
return ns
}
func namespaceAnnotationMatch(nsName string, p *Prometheus) bool {
ns := getNamespaceObject(nsName, p)
if ns == nil {
// in case of errors or other problems let it through
return true
}
tags := make([]*telegraf.Tag, 0, len(ns.Annotations))
for k, v := range ns.Annotations {
tags = append(tags, &telegraf.Tag{Key: k, Value: v})
}
return models.ShouldTagsPass(p.nsAnnotationPass, p.nsAnnotationDrop, tags)
}
/*
* If a namespace is specified and the pod doesn't have that namespace, return false
* Else return true
*/
func podHasMatchingNamespace(pod *corev1.Pod, p *Prometheus) bool {
return !(p.PodNamespace != "" && pod.Namespace != p.PodNamespace)
return p.PodNamespace == "" || pod.Namespace == p.PodNamespace
}
func podReady(pod *corev1.Pod) bool {
@ -361,6 +399,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
Address: targetURL.Hostname(),
OriginalURL: targetURL,
Tags: tags,
Namespace: pod.GetNamespace(),
}
}

View File

@ -6,7 +6,9 @@ import (
_ "embed"
"errors"
"fmt"
"github.com/influxdata/telegraf/models"
"io"
"k8s.io/client-go/tools/cache"
"net"
"net/http"
"net/url"
@ -87,6 +89,11 @@ type Prometheus struct {
client *http.Client
headers map[string]string
nsStore cache.Store
nsAnnotationPass []models.TagFilter
nsAnnotationDrop []models.TagFilter
// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodScrapeScope string `toml:"pod_scrape_scope"`
@ -109,6 +116,9 @@ type Prometheus struct {
MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"`
MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"`
NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"`
NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"`
// Only for monitor_kubernetes_pods=true
CacheRefreshInterval int `toml:"cache_refresh_interval"`
@ -163,6 +173,26 @@ func (p *Prometheus) Init() error {
p.Log.Infof("Using the label selector: %v and field selector: %v", p.podLabelSelector, p.podFieldSelector)
}
for k, vs := range p.NamespaceAnnotationPass {
tagFilter := models.TagFilter{}
tagFilter.Name = k
tagFilter.Values = append(tagFilter.Values, vs...)
if err := tagFilter.Compile(); err != nil {
return fmt.Errorf("error compiling 'namespace_annotation_pass', %w", err)
}
p.nsAnnotationPass = append(p.nsAnnotationPass, tagFilter)
}
for k, vs := range p.NamespaceAnnotationDrop {
tagFilter := models.TagFilter{}
tagFilter.Name = k
tagFilter.Values = append(tagFilter.Values, vs...)
if err := tagFilter.Compile(); err != nil {
return fmt.Errorf("error compiling 'namespace_annotation_drop', %w", err)
}
p.nsAnnotationDrop = append(p.nsAnnotationDrop, tagFilter)
}
ctx := context.Background()
p.HTTPClientConfig.Timeout = p.ResponseTimeout
client, err := p.HTTPClientConfig.CreateClient(ctx, p.Log)
@ -202,6 +232,7 @@ type URLAndAddress struct {
URL *url.URL
Address string
Tags map[string]string
Namespace string
}
func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
@ -223,8 +254,10 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
}
// loop through all pods scraped via the prometheus annotation on the pods
for _, v := range p.kubernetesPods {
if namespaceAnnotationMatch(v.Namespace, p) {
allURLs[v.URL.String()] = v
}
}
for _, service := range p.KubernetesServices {
address, err := url.Parse(service)

View File

@ -136,3 +136,11 @@
## Set to true/false to enforce TLS being enabled/disabled. If not set,
## enable TLS only if any of the other options are specified.
# tls_enable = true
## Control pod scraping based on pod namespace annotations
## Pass and drop here act like tagpass and tagdrop, but instead
## of filtering metrics they filters pod candidates for scraping
#[inputs.prometheus.namespace_annotation_pass]
# annotation_key = ["value1", "value2"]
#[inputs.prometheus.namespace_annotation_drop]
# some_annotation_key = ["dont-scrape"]