From 1b74a25252a34eb213ae670c9186cdd20a8ea418 Mon Sep 17 00:00:00 2001 From: varunjain0606 Date: Fri, 19 May 2023 16:54:09 +0300 Subject: [PATCH] feat(inputs.kubernetes): Extend kube_inventory plugin to include and extend resourcequota, secrets, node, and pod measurement (#13040) --- plugins/inputs/kube_inventory/README.md | 59 +++- plugins/inputs/kube_inventory/certificate.go | 94 +++++++ plugins/inputs/kube_inventory/client.go | 16 ++ .../inputs/kube_inventory/kube_inventory.go | 4 + plugins/inputs/kube_inventory/node.go | 47 +++- plugins/inputs/kube_inventory/node_test.go | 43 ++- plugins/inputs/kube_inventory/pod.go | 38 +++ plugins/inputs/kube_inventory/pod_test.go | 260 ++++++++++++++++++ .../inputs/kube_inventory/resourcequotas.go | 77 ++++++ .../kube_inventory/resourcequotas_test.go | 113 ++++++++ plugins/inputs/kubernetes/kubernetes.go | 27 +- plugins/inputs/kubernetes/kubernetes_pods.go | 10 + 12 files changed, 773 insertions(+), 15 deletions(-) create mode 100644 plugins/inputs/kube_inventory/certificate.go create mode 100644 plugins/inputs/kube_inventory/resourcequotas.go create mode 100644 plugins/inputs/kube_inventory/resourcequotas_test.go diff --git a/plugins/inputs/kube_inventory/README.md b/plugins/inputs/kube_inventory/README.md index d69731588..b16618f59 100644 --- a/plugins/inputs/kube_inventory/README.md +++ b/plugins/inputs/kube_inventory/README.md @@ -13,6 +13,7 @@ Kubernetes resources: - pods (containers) - services - statefulsets +- resourcequotas Kubernetes is a fast moving project, with a new minor release every 3 months. As such, we will aim to maintain support only for versions that are supported @@ -240,6 +241,9 @@ tls_key = "/run/telegraf-kubernetes-key" - kubernetes_node - tags: - node_name + - status + - condition + - cluster_namespace - fields: - capacity_cpu_cores - capacity_millicpu_cores @@ -249,6 +253,9 @@ tls_key = "/run/telegraf-kubernetes-key" - allocatable_millicpu_cores - allocatable_memory_bytes - allocatable_pods + - status_condition + - spec_unschedulable + - node_count - kubernetes_persistentvolume - tags: @@ -278,6 +285,7 @@ tls_key = "/run/telegraf-kubernetes-key" - phase - state - readiness + - condition - fields: - restarts_total - state_code @@ -288,6 +296,7 @@ tls_key = "/run/telegraf-kubernetes-key" - resource_requests_memory_bytes - resource_limits_millicpu_units - resource_limits_memory_bytes + - status_condition - kubernetes_service - tags: @@ -319,6 +328,49 @@ tls_key = "/run/telegraf-kubernetes-key" - spec_replicas - observed_generation +- kubernetes_resourcequota + - tags: + - resource + - namespace + - fields: + - hard_cpu_limits + - hard_cpu_requests + - hard_memory_limit + - hard_memory_requests + - hard_pods + - used_cpu_limits + - used_cpu_requests + - used_memory_limits + - used_memory_requests + - used_pods + +- kubernetes_certificate + - tags: + - common_name + - signature_algorithm + - public_key_algorithm + - issuer_common_name + - san + - verification + - name + - namespace + - fields: + - age + - expiry + - startdate + - enddate + - verification_code + +### kuberntes node status `status` + +The node status ready can mean 3 different values. + +| Tag value | Corresponding field value | Meaning | +| --------- | ------------------------- | ------- +| ready | 0 | NotReady| +| ready | 1 | Ready | +| ready | 2 | Unknown | + ### pv `phase_type` The persistentvolume "phase" is saved in the `phase` tag with a correlated @@ -351,11 +403,16 @@ numeric field called `phase_type` corresponding with that tag value. kubernetes_configmap,configmap_name=envoy-config,namespace=default,resource_version=56593031 created=1544103867000000000i 1547597616000000000 kubernetes_daemonset,daemonset_name=telegraf,selector_select1=s1,namespace=logging number_unavailable=0i,desired_number_scheduled=11i,number_available=11i,number_misscheduled=8i,number_ready=11i,updated_number_scheduled=11i,created=1527758699000000000i,generation=16i,current_number_scheduled=11i 1547597616000000000 kubernetes_deployment,deployment_name=deployd,selector_select1=s1,namespace=default replicas_unavailable=0i,created=1544103082000000000i,replicas_available=1i 1547597616000000000 -kubernetes_node,node_name=ip-172-17-0-2.internal allocatable_pods=110i,capacity_memory_bytes=128837533696,capacity_pods=110i,capacity_cpu_cores=16i,allocatable_cpu_cores=16i,allocatable_memory_bytes=128732676096 1547597616000000000 +kubernetes_node,host=vjain node_count=8i 1628918652000000000 +kubernetes_node,condition=Ready,host=vjain,node_name=ip-172-17-0-2.internal,status=True status_condition=1i 1629177980000000000 +kubernetes_node,cluster_namespace=tools,condition=Ready,host=vjain,node_name=ip-172-17-0-2.internal,status=True allocatable_cpu_cores=4i,allocatable_memory_bytes=7186567168i,allocatable_millicpu_cores=4000i,allocatable_pods=110i,capacity_cpu_cores=4i,capacity_memory_bytes=7291424768i,capacity_millicpu_cores=4000i,capacity_pods=110i,spec_unschedulable=0i,status_condition=1i 1628918652000000000 +kubernetes_resourcequota,host=vjain,namespace=default,resource=pods-high hard_cpu=1000i,hard_memory=214748364800i,hard_pods=10i,used_cpu=0i,used_memory=0i,used_pods=0i 1629110393000000000 +kubernetes_resourcequota,host=vjain,namespace=default,resource=pods-low hard_cpu=5i,hard_memory=10737418240i,hard_pods=10i,used_cpu=0i,used_memory=0i,used_pods=0i 1629110393000000000 kubernetes_persistentvolume,phase=Released,pv_name=pvc-aaaaaaaa-bbbb-cccc-1111-222222222222,storageclass=ebs-1-retain phase_type=3i 1547597616000000000 kubernetes_persistentvolumeclaim,namespace=default,phase=Bound,pvc_name=data-etcd-0,selector_select1=s1,storageclass=ebs-1-retain phase_type=0i 1547597615000000000 kubernetes_pod,namespace=default,node_name=ip-172-17-0-2.internal,pod_name=tick1 last_transition_time=1547578322000000000i,ready="false" 1547597616000000000 kubernetes_service,cluster_ip=172.29.61.80,namespace=redis-cache-0001,port_name=redis,port_protocol=TCP,selector_app=myapp,selector_io.kompose.service=redis,selector_role=slave,service_name=redis-slave created=1588690034000000000i,generation=0i,port=6379i,target_port=0i 1547597616000000000 +kubernetes_pod_container,condition=Ready,host=vjain,pod_name=uefi-5997f76f69-xzljt,status=True status_condition=1i 1629177981000000000 kubernetes_pod_container,container_name=telegraf,namespace=default,node_name=ip-172-17-0-2.internal,node_selector_node-role.kubernetes.io/compute=true,pod_name=tick1,phase=Running,state=running,readiness=ready resource_requests_cpu_units=0.1,resource_limits_memory_bytes=524288000,resource_limits_cpu_units=0.5,restarts_total=0i,state_code=0i,state_reason="",phase_reason="",resource_requests_memory_bytes=524288000 1547597616000000000 kubernetes_statefulset,namespace=default,selector_select1=s1,statefulset_name=etcd replicas_updated=3i,spec_replicas=3i,observed_generation=1i,created=1544101669000000000i,generation=1i,replicas=3i,replicas_current=3i,replicas_ready=3i 1547597616000000000 ``` diff --git a/plugins/inputs/kube_inventory/certificate.go b/plugins/inputs/kube_inventory/certificate.go new file mode 100644 index 000000000..5cf360328 --- /dev/null +++ b/plugins/inputs/kube_inventory/certificate.go @@ -0,0 +1,94 @@ +package kube_inventory + +import ( + "context" + "crypto/x509" + "encoding/pem" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectSecrets(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getTLSSecrets(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + ki.gatherCertificates(i, acc) + } +} + +func getFields(cert *x509.Certificate, now time.Time) map[string]interface{} { + age := int(now.Sub(cert.NotBefore).Seconds()) + expiry := int(cert.NotAfter.Sub(now).Seconds()) + startdate := cert.NotBefore.Unix() + enddate := cert.NotAfter.Unix() + + fields := map[string]interface{}{ + "age": age, + "expiry": expiry, + "startdate": startdate, + "enddate": enddate, + } + + return fields +} + +func getTags(cert *x509.Certificate) map[string]string { + tags := map[string]string{ + "common_name": cert.Subject.CommonName, + "signature_algorithm": cert.SignatureAlgorithm.String(), + "public_key_algorithm": cert.PublicKeyAlgorithm.String(), + } + tags["issuer_common_name"] = cert.Issuer.CommonName + + san := append(cert.DNSNames, cert.EmailAddresses...) + for _, ip := range cert.IPAddresses { + san = append(san, ip.String()) + } + for _, uri := range cert.URIs { + san = append(san, uri.String()) + } + tags["san"] = strings.Join(san, ",") + + return tags +} + +func (ki *KubernetesInventory) gatherCertificates(r corev1.Secret, acc telegraf.Accumulator) { + now := time.Now() + + for resourceName, val := range r.Data { + if resourceName != "tls.crt" { + continue + } + block, _ := pem.Decode(val) + if block == nil { + return + } + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return + } + fields := getFields(cert, now) + tags := getTags(cert) + tags["name"] = r.Name + tags["namespace"] = r.Namespace + opts := x509.VerifyOptions{ + KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageAny}, + } + _, err = cert.Verify(opts) + if err == nil { + tags["verification"] = "valid" + fields["verification_code"] = 0 + } else { + tags["verification"] = "invalid" + fields["verification_code"] = 1 + } + acc.AddFields(certificateMeasurement, fields, tags) + } +} diff --git a/plugins/inputs/kube_inventory/client.go b/plugins/inputs/kube_inventory/client.go index a47966eaf..e7c7a9206 100644 --- a/plugins/inputs/kube_inventory/client.go +++ b/plugins/inputs/kube_inventory/client.go @@ -8,6 +8,7 @@ import ( corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -120,3 +121,18 @@ func (c *client) getStatefulSets(ctx context.Context) (*appsv1.StatefulSetList, defer cancel() return c.AppsV1().StatefulSets(c.namespace).List(ctx, metav1.ListOptions{}) } + +func (c *client) getResourceQuotas(ctx context.Context) (*corev1.ResourceQuotaList, error) { + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return c.CoreV1().ResourceQuotas(c.namespace).List(ctx, metav1.ListOptions{}) +} + +func (c *client) getTLSSecrets(ctx context.Context) (*corev1.SecretList, error) { + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{"type": "kubernetes.io/tls"}} + return c.CoreV1().Secrets(c.namespace).List(ctx, metav1.ListOptions{ + FieldSelector: labels.Set(labelSelector.MatchLabels).String(), + }) +} diff --git a/plugins/inputs/kube_inventory/kube_inventory.go b/plugins/inputs/kube_inventory/kube_inventory.go index 2f4e81b8f..2ba0bc7d5 100644 --- a/plugins/inputs/kube_inventory/kube_inventory.go +++ b/plugins/inputs/kube_inventory/kube_inventory.go @@ -112,6 +112,8 @@ var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accu "statefulsets": collectStatefulSets, "persistentvolumes": collectPersistentVolumes, "persistentvolumeclaims": collectPersistentVolumeClaims, + "resourcequotas": collectResourceQuotas, + "secrets": collectSecrets, } func atoi(s string) int64 { @@ -159,6 +161,8 @@ var ( podContainerMeasurement = "kubernetes_pod_container" serviceMeasurement = "kubernetes_service" statefulSetMeasurement = "kubernetes_statefulset" + resourcequotaMeasurement = "kubernetes_resourcequota" + certificateMeasurement = "kubernetes_certificate" ) func init() { diff --git a/plugins/inputs/kube_inventory/node.go b/plugins/inputs/kube_inventory/node.go index b46b4e620..2fc7663f5 100644 --- a/plugins/inputs/kube_inventory/node.go +++ b/plugins/inputs/kube_inventory/node.go @@ -14,15 +14,27 @@ func collectNodes(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesI acc.AddError(err) return } + + ki.gatherNodeCount(len(list.Items), acc) + for _, n := range list.Items { ki.gatherNode(n, acc) } } +func (ki *KubernetesInventory) gatherNodeCount(count int, acc telegraf.Accumulator) { + fields := map[string]interface{}{"node_count": count} + tags := map[string]string{} + + acc.AddFields(nodeMeasurement, fields, tags) +} + func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulator) { fields := map[string]interface{}{} tags := map[string]string{ - "node_name": n.Name, + "node_name": n.Name, + "cluster_namespace": n.Annotations["cluster.x-k8s.io/cluster-namespace"], + "version": n.Status.NodeInfo.KubeletVersion, } for resourceName, val := range n.Status.Capacity { @@ -49,5 +61,38 @@ func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulato } } + for _, val := range n.Status.Conditions { + conditionfields := map[string]interface{}{} + conditiontags := map[string]string{ + "status": string(val.Status), + "condition": string(val.Type), + } + for k, v := range tags { + conditiontags[k] = v + } + running := 0 + nodeready := 0 + if val.Status == "True" { + if val.Type == "Ready" { + nodeready = 1 + } + running = 1 + } else if val.Status == "Unknown" { + if val.Type == "Ready" { + nodeready = 0 + } + running = 2 + } + conditionfields["status_condition"] = running + conditionfields["ready"] = nodeready + acc.AddFields(nodeMeasurement, conditionfields, conditiontags) + } + + unschedulable := 0 + if n.Spec.Unschedulable { + unschedulable = 1 + } + fields["spec_unschedulable"] = unschedulable + acc.AddFields(nodeMeasurement, fields, tags) } diff --git a/plugins/inputs/kube_inventory/node_test.go b/plugins/inputs/kube_inventory/node_test.go index 02f330a7d..5555558f7 100644 --- a/plugins/inputs/kube_inventory/node_test.go +++ b/plugins/inputs/kube_inventory/node_test.go @@ -16,7 +16,6 @@ import ( func TestNode(t *testing.T) { cli := &client{} now := time.Now() - created := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 1, 36, 0, now.Location()) tests := []struct { name string @@ -31,6 +30,16 @@ func TestNode(t *testing.T) { "/nodes/": corev1.NodeList{}, }, }, + output: []telegraf.Metric{ + testutil.MustMetric( + nodeMeasurement, + map[string]string{}, + map[string]interface{}{ + "node_count": int64(0), + }, + time.Unix(0, 0), + ), + }, hasError: false, }, { @@ -66,8 +75,7 @@ func TestNode(t *testing.T) { "pods": resource.MustParse("110"), }, Conditions: []corev1.NodeCondition{ - {Type: "Ready", Status: "true", LastTransitionTime: metav1.Time{Time: now}}, - {Type: "OutOfDisk", Status: "false", LastTransitionTime: metav1.Time{Time: created}}, + {Type: "Ready", Status: "True", LastTransitionTime: metav1.Time{Time: now}}, }, }, Spec: corev1.NodeSpec{ @@ -87,12 +95,15 @@ func TestNode(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Generation: 11232, - Namespace: "ns1", Name: "node1", Labels: map[string]string{ "lab1": "v1", "lab2": "v2", }, + Namespace: "ns1", + Annotations: map[string]string{ + "cluster.x-k8s.io/cluster-namespace": "ns1", + }, CreationTimestamp: metav1.Time{Time: now}, }, }, @@ -104,7 +115,24 @@ func TestNode(t *testing.T) { testutil.MustMetric( nodeMeasurement, map[string]string{ - "node_name": "node1", + "node_name": "node1", + "cluster_namespace": "ns1", + "condition": "Ready", + "status": "True", + "version": "v1.10.3", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(1), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + nodeMeasurement, + map[string]string{ + "node_name": "node1", + "cluster_namespace": "ns1", + "version": "v1.10.3", }, map[string]interface{}{ "capacity_cpu_cores": int64(16), @@ -115,6 +143,7 @@ func TestNode(t *testing.T) { "allocatable_millicpu_cores": int64(1000), "allocatable_memory_bytes": int64(1.28732676096e+11), "allocatable_pods": int64(110), + "spec_unschedulable": int64(0), }, time.Unix(0, 0), ), @@ -141,6 +170,10 @@ func TestNode(t *testing.T) { // No error case require.NoErrorf(t, err, "%s failed, err: %v", v.name, err) + if v.name == "no nodes" { + nodeCount := len((v.handler.responseMap["/nodes/"]).(corev1.NodeList).Items) + ks.gatherNodeCount(nodeCount, acc) + } require.Len(t, acc.Metrics, len(v.output)) testutil.RequireMetricsEqual(t, acc.GetTelegrafMetrics(), v.output, testutil.IgnoreTime()) } diff --git a/plugins/inputs/kube_inventory/pod.go b/plugins/inputs/kube_inventory/pod.go index ed95dd63d..9fc45b5b1 100644 --- a/plugins/inputs/kube_inventory/pod.go +++ b/plugins/inputs/kube_inventory/pod.go @@ -2,6 +2,7 @@ package kube_inventory import ( "context" + "strings" corev1 "k8s.io/api/core/v1" @@ -91,6 +92,11 @@ func (ki *KubernetesInventory) gatherPodContainer(p corev1.Pod, cs corev1.Contai "state": state, "readiness": readiness, } + splitImage := strings.Split(c.Image, ":") + if len(splitImage) == 2 { + tags["version"] = splitImage[1] + } + tags["image"] = splitImage[0] for key, val := range p.Spec.NodeSelector { if ki.selectorFilter.Match(key) { tags["node_selector_"+key] = val @@ -117,5 +123,37 @@ func (ki *KubernetesInventory) gatherPodContainer(p corev1.Pod, cs corev1.Contai } } + for _, val := range p.Status.Conditions { + conditionfields := map[string]interface{}{} + conditiontags := map[string]string{ + "container_name": c.Name, + "image": splitImage[0], + "status": string(val.Status), + "namespace": p.Namespace, + "node_name": p.Spec.NodeName, + "pod_name": p.Name, + "condition": string(val.Type), + } + if len(splitImage) == 2 { + conditiontags["version"] = splitImage[1] + } + running := 0 + podready := 0 + if val.Status == "True" { + if val.Type == "Ready" { + podready = 1 + } + running = 1 + } else if val.Status == "Unknown" { + if val.Type == "Ready" { + podready = 0 + } + running = 2 + } + conditionfields["status_condition"] = running + conditionfields["ready"] = podready + acc.AddFields(podContainerMeasurement, conditionfields, conditiontags) + } + acc.AddFields(podContainerMeasurement, fields, tags) } diff --git a/plugins/inputs/kube_inventory/pod_test.go b/plugins/inputs/kube_inventory/pod_test.go index 962805a67..cba35c689 100644 --- a/plugins/inputs/kube_inventory/pod_test.go +++ b/plugins/inputs/kube_inventory/pod_test.go @@ -213,6 +213,57 @@ func TestPod(t *testing.T) { }, }, output: []telegraf.Metric{ + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Initialized", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "running", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Ready", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "running", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(1), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Scheduled", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "running", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), testutil.MustMetric( podContainerMeasurement, map[string]string{ @@ -220,6 +271,7 @@ func TestPod(t *testing.T) { "container_name": "running", "node_name": "node1", "pod_name": "pod1", + "image": "image1", "phase": "Running", "state": "running", "readiness": "ready", @@ -234,11 +286,63 @@ func TestPod(t *testing.T) { }, time.Unix(0, 0), ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Initialized", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "completed", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Ready", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "completed", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(1), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Scheduled", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "completed", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), testutil.MustMetric( podContainerMeasurement, map[string]string{ "namespace": "ns1", "container_name": "completed", + "image": "image1", "node_name": "node1", "pod_name": "pod1", "phase": "Running", @@ -257,12 +361,64 @@ func TestPod(t *testing.T) { }, time.Unix(0, 0), ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Initialized", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "waiting", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Ready", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "waiting", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(1), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Scheduled", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "waiting", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), testutil.MustMetric( podContainerMeasurement, map[string]string{ "namespace": "ns1", "container_name": "waiting", "node_name": "node1", + "image": "image1", "pod_name": "pod1", "phase": "Running", "state": "waiting", @@ -676,6 +832,57 @@ func TestPodPendingContainers(t *testing.T) { }, }, output: []telegraf.Metric{ + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Initialized", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "waiting", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Ready", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "waiting", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(1), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Scheduled", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "waiting", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), testutil.MustMetric( podContainerMeasurement, map[string]string{ @@ -683,6 +890,7 @@ func TestPodPendingContainers(t *testing.T) { "container_name": "waiting", "node_name": "node1", "pod_name": "pod1", + "image": "image1", "phase": "Pending", "state": "unknown", "readiness": "unready", @@ -698,6 +906,57 @@ func TestPodPendingContainers(t *testing.T) { }, time.Unix(0, 0), ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Initialized", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "terminated", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Ready", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "terminated", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(1), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + podContainerMeasurement, + map[string]string{ + "pod_name": "pod1", + "condition": "Scheduled", + "status": "True", + "image": "image1", + "node_name": "node1", + "namespace": "ns1", + "container_name": "terminated", + }, + map[string]interface{}{ + "status_condition": int64(1), + "ready": int64(0), + }, + time.Unix(0, 0), + ), testutil.MustMetric( podContainerMeasurement, map[string]string{ @@ -705,6 +964,7 @@ func TestPodPendingContainers(t *testing.T) { "container_name": "terminated", "node_name": "node1", "pod_name": "pod1", + "image": "image1", "phase": "Pending", "state": "unknown", "readiness": "unready", diff --git a/plugins/inputs/kube_inventory/resourcequotas.go b/plugins/inputs/kube_inventory/resourcequotas.go new file mode 100644 index 000000000..015d4dd84 --- /dev/null +++ b/plugins/inputs/kube_inventory/resourcequotas.go @@ -0,0 +1,77 @@ +package kube_inventory + +import ( + "context" + "strings" + + corev1 "k8s.io/api/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectResourceQuotas(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getResourceQuotas(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + ki.gatherResourceQuota(i, acc) + } +} + +func (ki *KubernetesInventory) gatherResourceQuota(r corev1.ResourceQuota, acc telegraf.Accumulator) { + fields := map[string]interface{}{} + tags := map[string]string{ + "resource": r.Name, + "namespace": r.Namespace, + } + + for resourceName, val := range r.Status.Hard { + switch resourceName { + case "cpu", "limits.cpu", "requests.cpu": + key := "hard_cpu" + if strings.Contains(string(resourceName), "limits") { + key = key + "_limits" + } else if strings.Contains(string(resourceName), "requests") { + key = key + "_requests" + } + fields[key] = ki.convertQuantity(val.String(), 1) + case "memory", "limits.memory", "requests.memory": + key := "hard_memory" + if strings.Contains(string(resourceName), "limits") { + key = key + "_limits" + } else if strings.Contains(string(resourceName), "requests") { + key = key + "_requests" + } + fields[key] = ki.convertQuantity(val.String(), 1) + case "pods": + fields["hard_pods"] = atoi(val.String()) + } + } + + for resourceName, val := range r.Status.Used { + switch resourceName { + case "cpu", "requests.cpu", "limits.cpu": + key := "used_cpu" + if strings.Contains(string(resourceName), "limits") { + key = key + "_limits" + } else if strings.Contains(string(resourceName), "requests") { + key = key + "_requests" + } + fields[key] = ki.convertQuantity(val.String(), 1) + case "memory", "requests.memory", "limits.memory": + key := "used_memory" + if strings.Contains(string(resourceName), "limits") { + key = key + "_limits" + } else if strings.Contains(string(resourceName), "requests") { + key = key + "_requests" + } + fields[key] = ki.convertQuantity(val.String(), 1) + case "pods": + fields["used_pods"] = atoi(val.String()) + } + } + + acc.AddFields(resourcequotaMeasurement, fields, tags) +} diff --git a/plugins/inputs/kube_inventory/resourcequotas_test.go b/plugins/inputs/kube_inventory/resourcequotas_test.go new file mode 100644 index 000000000..8b985141c --- /dev/null +++ b/plugins/inputs/kube_inventory/resourcequotas_test.go @@ -0,0 +1,113 @@ +package kube_inventory + +import ( + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestResourceQuota(t *testing.T) { + cli := &client{} + now := time.Now() + + tests := []struct { + name string + handler *mockHandler + output []telegraf.Metric + hasError bool + }{ + { + name: "no ressourcequota", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/resourcequotas/": corev1.ResourceQuotaList{}, + }, + }, + output: []telegraf.Metric{}, + hasError: false, + }, + { + name: "collect resourceqota", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/resourcequotas/": corev1.ResourceQuotaList{ + Items: []corev1.ResourceQuota{ + { + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "cpu": resource.MustParse("16"), + "memory": resource.MustParse("125817904Ki"), + "pods": resource.MustParse("110"), + }, + Used: corev1.ResourceList{ + "cpu": resource.MustParse("10"), + "memory": resource.MustParse("125715504Ki"), + "pods": resource.MustParse("0"), + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Generation: 11232, + Namespace: "ns1", + Name: "rq1", + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: metav1.Time{Time: now}, + }, + }, + }, + }, + }, + }, + output: []telegraf.Metric{ + testutil.MustMetric( + resourcequotaMeasurement, + map[string]string{ + "resource": "rq1", + "namespace": "ns1", + }, + map[string]interface{}{ + "hard_cpu": int64(16), + "hard_memory": int64(1.28837533696e+11), + "hard_pods": int64(110), + "used_cpu": int64(10), + "used_memory": int64(1.28732676096e+11), + "used_pods": int64(0), + }, + time.Unix(0, 0), + ), + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, quota := range ((v.handler.responseMap["/resourcequotas/"]).(corev1.ResourceQuotaList)).Items { + ks.gatherResourceQuota(quota, acc) + } + + err := acc.FirstError() + if v.hasError { + require.Errorf(t, err, "%s failed, should have error", v.name) + continue + } + + // No error case + require.NoErrorf(t, err, "%s failed, err: %v", v.name, err) + + require.Len(t, acc.Metrics, len(v.output)) + testutil.RequireMetricsEqual(t, acc.GetTelegrafMetrics(), v.output, testutil.IgnoreTime()) + } +} diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go index bf8f4bff1..56153ccb4 100644 --- a/plugins/inputs/kubernetes/kubernetes.go +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -221,16 +221,14 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) acc.AddFields("kubernetes_node", fields, tags) } -func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) { +func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Item, error) { var podAPI Pods err := k.LoadJSON(fmt.Sprintf("%s/pods", baseURL), &podAPI) if err != nil { return nil, err } - podInfos := make([]Metadata, 0, len(podAPI.Items)) - for _, podMetadata := range podAPI.Items { - podInfos = append(podInfos, podMetadata.Metadata) - } + podInfos := make([]Item, 0, len(podAPI.Items)) + podInfos = append(podInfos, podAPI.Items...) return podInfos, nil } @@ -286,12 +284,16 @@ func (k *Kubernetes) LoadJSON(url string, v interface{}) error { return nil } -func buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) { +func buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []Item, labelFilter filter.Filter, acc telegraf.Accumulator) { for _, pod := range summaryMetrics.Pods { podLabels := make(map[string]string) + containerImages := make(map[string]string) for _, info := range podInfo { - if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace { - for k, v := range info.Labels { + if info.Metadata.Name == pod.PodRef.Name && info.Metadata.Namespace == pod.PodRef.Namespace { + for _, v := range info.Spec.Containers { + containerImages[v.Name] = v.Image + } + for k, v := range info.Metadata.Labels { if labelFilter.Match(k) { podLabels[k] = v } @@ -306,6 +308,15 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFi "container_name": container.Name, "pod_name": pod.PodRef.Name, } + for k, v := range containerImages { + if k == container.Name { + tags["image"] = v + tok := strings.Split(v, ":") + if len(tok) == 2 { + tags["version"] = tok[1] + } + } + } for k, v := range podLabels { tags[k] = v } diff --git a/plugins/inputs/kubernetes/kubernetes_pods.go b/plugins/inputs/kubernetes/kubernetes_pods.go index 29d5e7789..83c267ec7 100644 --- a/plugins/inputs/kubernetes/kubernetes_pods.go +++ b/plugins/inputs/kubernetes/kubernetes_pods.go @@ -8,6 +8,7 @@ type Pods struct { type Item struct { Metadata Metadata `json:"metadata"` + Spec Spec `json:"spec"` } type Metadata struct { @@ -15,3 +16,12 @@ type Metadata struct { Namespace string `json:"namespace"` Labels map[string]string `json:"labels"` } + +type Spec struct { + Containers []Container `json:"containers"` +} + +type Container struct { + Name string `json:"name"` + Image string `json:"image"` +}