diff --git a/models/filter.go b/models/filter.go index 0b3172336..4e2f614e3 100644 --- a/models/filter.go +++ b/models/filter.go @@ -14,6 +14,15 @@ type TagFilter struct { filter filter.Filter } +func (tf *TagFilter) Compile() error { + f, err := filter.Compile(tf.Values) + if err != nil { + return err + } + tf.filter = f + return nil +} + // Filter containing drop/pass and tagdrop/tagpass rules type Filter struct { NameDrop []string @@ -79,15 +88,14 @@ func (f *Filter) Compile() error { return fmt.Errorf("error compiling 'taginclude', %w", err) } - for i := range f.TagDropFilters { - f.TagDropFilters[i].filter, err = filter.Compile(f.TagDropFilters[i].Values) - if err != nil { + for i := 0; i < len(f.TagDropFilters); i++ { + if err := f.TagDropFilters[i].Compile(); err != nil { return fmt.Errorf("error compiling 'tagdrop', %w", err) } } - for i := range f.TagPassFilters { - f.TagPassFilters[i].filter, err = filter.Compile(f.TagPassFilters[i].Values) - if err != nil { + + for i := 0; i < len(f.TagPassFilters); i++ { + if err := f.TagPassFilters[i].Compile(); err != nil { return fmt.Errorf("error compiling 'tagpass', %w", err) } } @@ -166,51 +174,7 @@ func (f *Filter) shouldFieldPass(key string) bool { // shouldTagsPass returns true if the metric should pass, false if it should drop // based on the tagdrop/tagpass filter parameters func (f *Filter) shouldTagsPass(tags []*telegraf.Tag) bool { - pass := func(f *Filter) bool { - for _, pat := range f.TagPassFilters { - if pat.filter == nil { - continue - } - for _, tag := range tags { - if tag.Key == pat.Name { - if pat.filter.Match(tag.Value) { - return true - } - } - } - } - return false - } - - drop := func(f *Filter) bool { - for _, pat := range f.TagDropFilters { - if pat.filter == nil { - continue - } - for _, tag := range tags { - if tag.Key == pat.Name { - if pat.filter.Match(tag.Value) { - return false - } - } - } - } - return true - } - - // Add additional logic in case where both parameters are set. - // see: https://github.com/influxdata/telegraf/issues/2860 - if f.TagPassFilters != nil && f.TagDropFilters != nil { - // return true only in case when tag pass and won't be dropped (true, true). - // in case when the same tag should be passed and dropped it will be dropped (true, false). - return pass(f) && drop(f) - } else if f.TagPassFilters != nil { - return pass(f) - } else if f.TagDropFilters != nil { - return drop(f) - } - - return true + return ShouldTagsPass(f.TagPassFilters, f.TagDropFilters, tags) } // filterFields removes fields according to fieldpass/fielddrop. @@ -252,3 +216,51 @@ func (f *Filter) filterTags(metric telegraf.Metric) { metric.RemoveTag(key) } } + +func ShouldTagsPass(passFilters []TagFilter, dropFilters []TagFilter, tags []*telegraf.Tag) bool { + pass := func(tpf []TagFilter) bool { + for _, pat := range tpf { + if pat.filter == nil { + continue + } + for _, tag := range tags { + if tag.Key == pat.Name { + if pat.filter.Match(tag.Value) { + return true + } + } + } + } + return false + } + + drop := func(tdf []TagFilter) bool { + for _, pat := range tdf { + if pat.filter == nil { + continue + } + for _, tag := range tags { + if tag.Key == pat.Name { + if pat.filter.Match(tag.Value) { + return false + } + } + } + } + return true + } + + // Add additional logic in case where both parameters are set. + // see: https://github.com/influxdata/telegraf/issues/2860 + if passFilters != nil && dropFilters != nil { + // return true only in case when tag pass and won't be dropped (true, true). + // in case when the same tag should be passed and dropped it will be dropped (true, false). + return pass(passFilters) && drop(dropFilters) + } else if passFilters != nil { + return pass(passFilters) + } else if dropFilters != nil { + return drop(dropFilters) + } + + return true +} diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 3eac2b930..1d3b7bf12 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -153,6 +153,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Set to true/false to enforce TLS being enabled/disabled. If not set, ## enable TLS only if any of the other options are specified. # tls_enable = true + + ## Control pod scraping based on pod namespace annotations + ## Pass and drop here act like tagpass and tagdrop, but instead + ## of filtering metrics they filters pod candidates for scraping + #[inputs.prometheus.namespace_annotation_pass] + # annotation_key = ["value1", "value2"] + #[inputs.prometheus.namespace_annotation_drop] + # some_annotation_key = ["dont-scrape"] ``` `urls` can contain a unix socket as well. If a different path is required diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index e95a7a20b..3a2ef9635 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "encoding/json" "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/models" "net" "net/http" "net/url" @@ -135,6 +137,8 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients informerfactory = informers.NewSharedInformerFactory(clientset, resyncinterval) } + p.nsStore = informerfactory.Core().V1().Namespaces().Informer().GetStore() + podinformer := informerfactory.Core().V1().Pods() podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(newObj interface{}) { @@ -301,12 +305,46 @@ func podHasMatchingFieldSelector(pod *corev1.Pod, fieldSelector fields.Selector) return fieldSelector.Matches(fieldsSet) } +// Get corev1.Namespace object by name +func getNamespaceObject(name string, p *Prometheus) *corev1.Namespace { + if p.nsStore == nil { // can happen in tests + return nil + } + nsObj, exists, err := p.nsStore.GetByKey(name) + if err != nil { + p.Log.Errorf("Err fetching namespace '%s': %v", name, err) + return nil + } else if !exists { + return nil // can't happen + } + ns, ok := nsObj.(*corev1.Namespace) + if !ok { + p.Log.Errorf("[BUG] received unexpected object: %v", nsObj) + return nil + } + return ns +} + +func namespaceAnnotationMatch(nsName string, p *Prometheus) bool { + ns := getNamespaceObject(nsName, p) + if ns == nil { + // in case of errors or other problems let it through + return true + } + + tags := make([]*telegraf.Tag, 0, len(ns.Annotations)) + for k, v := range ns.Annotations { + tags = append(tags, &telegraf.Tag{Key: k, Value: v}) + } + return models.ShouldTagsPass(p.nsAnnotationPass, p.nsAnnotationDrop, tags) +} + /* * If a namespace is specified and the pod doesn't have that namespace, return false * Else return true */ func podHasMatchingNamespace(pod *corev1.Pod, p *Prometheus) bool { - return !(p.PodNamespace != "" && pod.Namespace != p.PodNamespace) + return p.PodNamespace == "" || pod.Namespace == p.PodNamespace } func podReady(pod *corev1.Pod) bool { @@ -361,6 +399,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { Address: targetURL.Hostname(), OriginalURL: targetURL, Tags: tags, + Namespace: pod.GetNamespace(), } } diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index c5423ca48..7d0e9acba 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -6,7 +6,9 @@ import ( _ "embed" "errors" "fmt" + "github.com/influxdata/telegraf/models" "io" + "k8s.io/client-go/tools/cache" "net" "net/http" "net/url" @@ -87,6 +89,11 @@ type Prometheus struct { client *http.Client headers map[string]string + nsStore cache.Store + + nsAnnotationPass []models.TagFilter + nsAnnotationDrop []models.TagFilter + // Should we scrape Kubernetes services for prometheus annotations MonitorPods bool `toml:"monitor_kubernetes_pods"` PodScrapeScope string `toml:"pod_scrape_scope"` @@ -109,6 +116,9 @@ type Prometheus struct { MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"` MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"` + NamespaceAnnotationPass map[string][]string `toml:"namespace_annotation_pass"` + NamespaceAnnotationDrop map[string][]string `toml:"namespace_annotation_drop"` + // Only for monitor_kubernetes_pods=true CacheRefreshInterval int `toml:"cache_refresh_interval"` @@ -163,6 +173,26 @@ func (p *Prometheus) Init() error { p.Log.Infof("Using the label selector: %v and field selector: %v", p.podLabelSelector, p.podFieldSelector) } + for k, vs := range p.NamespaceAnnotationPass { + tagFilter := models.TagFilter{} + tagFilter.Name = k + tagFilter.Values = append(tagFilter.Values, vs...) + if err := tagFilter.Compile(); err != nil { + return fmt.Errorf("error compiling 'namespace_annotation_pass', %w", err) + } + p.nsAnnotationPass = append(p.nsAnnotationPass, tagFilter) + } + + for k, vs := range p.NamespaceAnnotationDrop { + tagFilter := models.TagFilter{} + tagFilter.Name = k + tagFilter.Values = append(tagFilter.Values, vs...) + if err := tagFilter.Compile(); err != nil { + return fmt.Errorf("error compiling 'namespace_annotation_drop', %w", err) + } + p.nsAnnotationDrop = append(p.nsAnnotationDrop, tagFilter) + } + ctx := context.Background() p.HTTPClientConfig.Timeout = p.ResponseTimeout client, err := p.HTTPClientConfig.CreateClient(ctx, p.Log) @@ -202,6 +232,7 @@ type URLAndAddress struct { URL *url.URL Address string Tags map[string]string + Namespace string } func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { @@ -223,7 +254,9 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { } // loop through all pods scraped via the prometheus annotation on the pods for _, v := range p.kubernetesPods { - allURLs[v.URL.String()] = v + if namespaceAnnotationMatch(v.Namespace, p) { + allURLs[v.URL.String()] = v + } } for _, service := range p.KubernetesServices { diff --git a/plugins/inputs/prometheus/sample.conf b/plugins/inputs/prometheus/sample.conf index c74370754..538acf517 100644 --- a/plugins/inputs/prometheus/sample.conf +++ b/plugins/inputs/prometheus/sample.conf @@ -136,3 +136,11 @@ ## Set to true/false to enforce TLS being enabled/disabled. If not set, ## enable TLS only if any of the other options are specified. # tls_enable = true + + ## Control pod scraping based on pod namespace annotations + ## Pass and drop here act like tagpass and tagdrop, but instead + ## of filtering metrics they filters pod candidates for scraping + #[inputs.prometheus.namespace_annotation_pass] + # annotation_key = ["value1", "value2"] + #[inputs.prometheus.namespace_annotation_drop] + # some_annotation_key = ["dont-scrape"]