feat(inputs.kubernetes): Extend kube_inventory plugin to include and extend resourcequota, secrets, node, and pod measurement (#13040)

This commit is contained in:
varunjain0606 2023-05-19 16:54:09 +03:00 committed by GitHub
parent ad4df2105c
commit 1b74a25252
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 773 additions and 15 deletions

View File

@ -13,6 +13,7 @@ Kubernetes resources:
- pods (containers)
- services
- statefulsets
- resourcequotas
Kubernetes is a fast moving project, with a new minor release every 3 months.
As such, we will aim to maintain support only for versions that are supported
@ -240,6 +241,9 @@ tls_key = "/run/telegraf-kubernetes-key"
- kubernetes_node
- tags:
- node_name
- status
- condition
- cluster_namespace
- fields:
- capacity_cpu_cores
- capacity_millicpu_cores
@ -249,6 +253,9 @@ tls_key = "/run/telegraf-kubernetes-key"
- allocatable_millicpu_cores
- allocatable_memory_bytes
- allocatable_pods
- status_condition
- spec_unschedulable
- node_count
- kubernetes_persistentvolume
- tags:
@ -278,6 +285,7 @@ tls_key = "/run/telegraf-kubernetes-key"
- phase
- state
- readiness
- condition
- fields:
- restarts_total
- state_code
@ -288,6 +296,7 @@ tls_key = "/run/telegraf-kubernetes-key"
- resource_requests_memory_bytes
- resource_limits_millicpu_units
- resource_limits_memory_bytes
- status_condition
- kubernetes_service
- tags:
@ -319,6 +328,49 @@ tls_key = "/run/telegraf-kubernetes-key"
- spec_replicas
- observed_generation
- kubernetes_resourcequota
- tags:
- resource
- namespace
- fields:
- hard_cpu_limits
- hard_cpu_requests
- hard_memory_limit
- hard_memory_requests
- hard_pods
- used_cpu_limits
- used_cpu_requests
- used_memory_limits
- used_memory_requests
- used_pods
- kubernetes_certificate
- tags:
- common_name
- signature_algorithm
- public_key_algorithm
- issuer_common_name
- san
- verification
- name
- namespace
- fields:
- age
- expiry
- startdate
- enddate
- verification_code
### kuberntes node status `status`
The node status ready can mean 3 different values.
| Tag value | Corresponding field value | Meaning |
| --------- | ------------------------- | -------
| ready | 0 | NotReady|
| ready | 1 | Ready |
| ready | 2 | Unknown |
### pv `phase_type`
The persistentvolume "phase" is saved in the `phase` tag with a correlated
@ -351,11 +403,16 @@ numeric field called `phase_type` corresponding with that tag value.
kubernetes_configmap,configmap_name=envoy-config,namespace=default,resource_version=56593031 created=1544103867000000000i 1547597616000000000
kubernetes_daemonset,daemonset_name=telegraf,selector_select1=s1,namespace=logging number_unavailable=0i,desired_number_scheduled=11i,number_available=11i,number_misscheduled=8i,number_ready=11i,updated_number_scheduled=11i,created=1527758699000000000i,generation=16i,current_number_scheduled=11i 1547597616000000000
kubernetes_deployment,deployment_name=deployd,selector_select1=s1,namespace=default replicas_unavailable=0i,created=1544103082000000000i,replicas_available=1i 1547597616000000000
kubernetes_node,node_name=ip-172-17-0-2.internal allocatable_pods=110i,capacity_memory_bytes=128837533696,capacity_pods=110i,capacity_cpu_cores=16i,allocatable_cpu_cores=16i,allocatable_memory_bytes=128732676096 1547597616000000000
kubernetes_node,host=vjain node_count=8i 1628918652000000000
kubernetes_node,condition=Ready,host=vjain,node_name=ip-172-17-0-2.internal,status=True status_condition=1i 1629177980000000000
kubernetes_node,cluster_namespace=tools,condition=Ready,host=vjain,node_name=ip-172-17-0-2.internal,status=True allocatable_cpu_cores=4i,allocatable_memory_bytes=7186567168i,allocatable_millicpu_cores=4000i,allocatable_pods=110i,capacity_cpu_cores=4i,capacity_memory_bytes=7291424768i,capacity_millicpu_cores=4000i,capacity_pods=110i,spec_unschedulable=0i,status_condition=1i 1628918652000000000
kubernetes_resourcequota,host=vjain,namespace=default,resource=pods-high hard_cpu=1000i,hard_memory=214748364800i,hard_pods=10i,used_cpu=0i,used_memory=0i,used_pods=0i 1629110393000000000
kubernetes_resourcequota,host=vjain,namespace=default,resource=pods-low hard_cpu=5i,hard_memory=10737418240i,hard_pods=10i,used_cpu=0i,used_memory=0i,used_pods=0i 1629110393000000000
kubernetes_persistentvolume,phase=Released,pv_name=pvc-aaaaaaaa-bbbb-cccc-1111-222222222222,storageclass=ebs-1-retain phase_type=3i 1547597616000000000
kubernetes_persistentvolumeclaim,namespace=default,phase=Bound,pvc_name=data-etcd-0,selector_select1=s1,storageclass=ebs-1-retain phase_type=0i 1547597615000000000
kubernetes_pod,namespace=default,node_name=ip-172-17-0-2.internal,pod_name=tick1 last_transition_time=1547578322000000000i,ready="false" 1547597616000000000
kubernetes_service,cluster_ip=172.29.61.80,namespace=redis-cache-0001,port_name=redis,port_protocol=TCP,selector_app=myapp,selector_io.kompose.service=redis,selector_role=slave,service_name=redis-slave created=1588690034000000000i,generation=0i,port=6379i,target_port=0i 1547597616000000000
kubernetes_pod_container,condition=Ready,host=vjain,pod_name=uefi-5997f76f69-xzljt,status=True status_condition=1i 1629177981000000000
kubernetes_pod_container,container_name=telegraf,namespace=default,node_name=ip-172-17-0-2.internal,node_selector_node-role.kubernetes.io/compute=true,pod_name=tick1,phase=Running,state=running,readiness=ready resource_requests_cpu_units=0.1,resource_limits_memory_bytes=524288000,resource_limits_cpu_units=0.5,restarts_total=0i,state_code=0i,state_reason="",phase_reason="",resource_requests_memory_bytes=524288000 1547597616000000000
kubernetes_statefulset,namespace=default,selector_select1=s1,statefulset_name=etcd replicas_updated=3i,spec_replicas=3i,observed_generation=1i,created=1544101669000000000i,generation=1i,replicas=3i,replicas_current=3i,replicas_ready=3i 1547597616000000000
```

View File

@ -0,0 +1,94 @@
package kube_inventory
import (
"context"
"crypto/x509"
"encoding/pem"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"github.com/influxdata/telegraf"
)
func collectSecrets(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getTLSSecrets(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, i := range list.Items {
ki.gatherCertificates(i, acc)
}
}
func getFields(cert *x509.Certificate, now time.Time) map[string]interface{} {
age := int(now.Sub(cert.NotBefore).Seconds())
expiry := int(cert.NotAfter.Sub(now).Seconds())
startdate := cert.NotBefore.Unix()
enddate := cert.NotAfter.Unix()
fields := map[string]interface{}{
"age": age,
"expiry": expiry,
"startdate": startdate,
"enddate": enddate,
}
return fields
}
func getTags(cert *x509.Certificate) map[string]string {
tags := map[string]string{
"common_name": cert.Subject.CommonName,
"signature_algorithm": cert.SignatureAlgorithm.String(),
"public_key_algorithm": cert.PublicKeyAlgorithm.String(),
}
tags["issuer_common_name"] = cert.Issuer.CommonName
san := append(cert.DNSNames, cert.EmailAddresses...)
for _, ip := range cert.IPAddresses {
san = append(san, ip.String())
}
for _, uri := range cert.URIs {
san = append(san, uri.String())
}
tags["san"] = strings.Join(san, ",")
return tags
}
func (ki *KubernetesInventory) gatherCertificates(r corev1.Secret, acc telegraf.Accumulator) {
now := time.Now()
for resourceName, val := range r.Data {
if resourceName != "tls.crt" {
continue
}
block, _ := pem.Decode(val)
if block == nil {
return
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return
}
fields := getFields(cert, now)
tags := getTags(cert)
tags["name"] = r.Name
tags["namespace"] = r.Namespace
opts := x509.VerifyOptions{
KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageAny},
}
_, err = cert.Verify(opts)
if err == nil {
tags["verification"] = "valid"
fields["verification_code"] = 0
} else {
tags["verification"] = "invalid"
fields["verification_code"] = 1
}
acc.AddFields(certificateMeasurement, fields, tags)
}
}

View File

@ -8,6 +8,7 @@ import (
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
@ -120,3 +121,18 @@ func (c *client) getStatefulSets(ctx context.Context) (*appsv1.StatefulSetList,
defer cancel()
return c.AppsV1().StatefulSets(c.namespace).List(ctx, metav1.ListOptions{})
}
func (c *client) getResourceQuotas(ctx context.Context) (*corev1.ResourceQuotaList, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return c.CoreV1().ResourceQuotas(c.namespace).List(ctx, metav1.ListOptions{})
}
func (c *client) getTLSSecrets(ctx context.Context) (*corev1.SecretList, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{"type": "kubernetes.io/tls"}}
return c.CoreV1().Secrets(c.namespace).List(ctx, metav1.ListOptions{
FieldSelector: labels.Set(labelSelector.MatchLabels).String(),
})
}

View File

@ -112,6 +112,8 @@ var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accu
"statefulsets": collectStatefulSets,
"persistentvolumes": collectPersistentVolumes,
"persistentvolumeclaims": collectPersistentVolumeClaims,
"resourcequotas": collectResourceQuotas,
"secrets": collectSecrets,
}
func atoi(s string) int64 {
@ -159,6 +161,8 @@ var (
podContainerMeasurement = "kubernetes_pod_container"
serviceMeasurement = "kubernetes_service"
statefulSetMeasurement = "kubernetes_statefulset"
resourcequotaMeasurement = "kubernetes_resourcequota"
certificateMeasurement = "kubernetes_certificate"
)
func init() {

View File

@ -14,15 +14,27 @@ func collectNodes(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesI
acc.AddError(err)
return
}
ki.gatherNodeCount(len(list.Items), acc)
for _, n := range list.Items {
ki.gatherNode(n, acc)
}
}
func (ki *KubernetesInventory) gatherNodeCount(count int, acc telegraf.Accumulator) {
fields := map[string]interface{}{"node_count": count}
tags := map[string]string{}
acc.AddFields(nodeMeasurement, fields, tags)
}
func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulator) {
fields := map[string]interface{}{}
tags := map[string]string{
"node_name": n.Name,
"node_name": n.Name,
"cluster_namespace": n.Annotations["cluster.x-k8s.io/cluster-namespace"],
"version": n.Status.NodeInfo.KubeletVersion,
}
for resourceName, val := range n.Status.Capacity {
@ -49,5 +61,38 @@ func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulato
}
}
for _, val := range n.Status.Conditions {
conditionfields := map[string]interface{}{}
conditiontags := map[string]string{
"status": string(val.Status),
"condition": string(val.Type),
}
for k, v := range tags {
conditiontags[k] = v
}
running := 0
nodeready := 0
if val.Status == "True" {
if val.Type == "Ready" {
nodeready = 1
}
running = 1
} else if val.Status == "Unknown" {
if val.Type == "Ready" {
nodeready = 0
}
running = 2
}
conditionfields["status_condition"] = running
conditionfields["ready"] = nodeready
acc.AddFields(nodeMeasurement, conditionfields, conditiontags)
}
unschedulable := 0
if n.Spec.Unschedulable {
unschedulable = 1
}
fields["spec_unschedulable"] = unschedulable
acc.AddFields(nodeMeasurement, fields, tags)
}

View File

@ -16,7 +16,6 @@ import (
func TestNode(t *testing.T) {
cli := &client{}
now := time.Now()
created := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 1, 36, 0, now.Location())
tests := []struct {
name string
@ -31,6 +30,16 @@ func TestNode(t *testing.T) {
"/nodes/": corev1.NodeList{},
},
},
output: []telegraf.Metric{
testutil.MustMetric(
nodeMeasurement,
map[string]string{},
map[string]interface{}{
"node_count": int64(0),
},
time.Unix(0, 0),
),
},
hasError: false,
},
{
@ -66,8 +75,7 @@ func TestNode(t *testing.T) {
"pods": resource.MustParse("110"),
},
Conditions: []corev1.NodeCondition{
{Type: "Ready", Status: "true", LastTransitionTime: metav1.Time{Time: now}},
{Type: "OutOfDisk", Status: "false", LastTransitionTime: metav1.Time{Time: created}},
{Type: "Ready", Status: "True", LastTransitionTime: metav1.Time{Time: now}},
},
},
Spec: corev1.NodeSpec{
@ -87,12 +95,15 @@ func TestNode(t *testing.T) {
},
ObjectMeta: metav1.ObjectMeta{
Generation: 11232,
Namespace: "ns1",
Name: "node1",
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
Namespace: "ns1",
Annotations: map[string]string{
"cluster.x-k8s.io/cluster-namespace": "ns1",
},
CreationTimestamp: metav1.Time{Time: now},
},
},
@ -104,7 +115,24 @@ func TestNode(t *testing.T) {
testutil.MustMetric(
nodeMeasurement,
map[string]string{
"node_name": "node1",
"node_name": "node1",
"cluster_namespace": "ns1",
"condition": "Ready",
"status": "True",
"version": "v1.10.3",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(1),
},
time.Unix(0, 0),
),
testutil.MustMetric(
nodeMeasurement,
map[string]string{
"node_name": "node1",
"cluster_namespace": "ns1",
"version": "v1.10.3",
},
map[string]interface{}{
"capacity_cpu_cores": int64(16),
@ -115,6 +143,7 @@ func TestNode(t *testing.T) {
"allocatable_millicpu_cores": int64(1000),
"allocatable_memory_bytes": int64(1.28732676096e+11),
"allocatable_pods": int64(110),
"spec_unschedulable": int64(0),
},
time.Unix(0, 0),
),
@ -141,6 +170,10 @@ func TestNode(t *testing.T) {
// No error case
require.NoErrorf(t, err, "%s failed, err: %v", v.name, err)
if v.name == "no nodes" {
nodeCount := len((v.handler.responseMap["/nodes/"]).(corev1.NodeList).Items)
ks.gatherNodeCount(nodeCount, acc)
}
require.Len(t, acc.Metrics, len(v.output))
testutil.RequireMetricsEqual(t, acc.GetTelegrafMetrics(), v.output, testutil.IgnoreTime())
}

View File

@ -2,6 +2,7 @@ package kube_inventory
import (
"context"
"strings"
corev1 "k8s.io/api/core/v1"
@ -91,6 +92,11 @@ func (ki *KubernetesInventory) gatherPodContainer(p corev1.Pod, cs corev1.Contai
"state": state,
"readiness": readiness,
}
splitImage := strings.Split(c.Image, ":")
if len(splitImage) == 2 {
tags["version"] = splitImage[1]
}
tags["image"] = splitImage[0]
for key, val := range p.Spec.NodeSelector {
if ki.selectorFilter.Match(key) {
tags["node_selector_"+key] = val
@ -117,5 +123,37 @@ func (ki *KubernetesInventory) gatherPodContainer(p corev1.Pod, cs corev1.Contai
}
}
for _, val := range p.Status.Conditions {
conditionfields := map[string]interface{}{}
conditiontags := map[string]string{
"container_name": c.Name,
"image": splitImage[0],
"status": string(val.Status),
"namespace": p.Namespace,
"node_name": p.Spec.NodeName,
"pod_name": p.Name,
"condition": string(val.Type),
}
if len(splitImage) == 2 {
conditiontags["version"] = splitImage[1]
}
running := 0
podready := 0
if val.Status == "True" {
if val.Type == "Ready" {
podready = 1
}
running = 1
} else if val.Status == "Unknown" {
if val.Type == "Ready" {
podready = 0
}
running = 2
}
conditionfields["status_condition"] = running
conditionfields["ready"] = podready
acc.AddFields(podContainerMeasurement, conditionfields, conditiontags)
}
acc.AddFields(podContainerMeasurement, fields, tags)
}

View File

@ -213,6 +213,57 @@ func TestPod(t *testing.T) {
},
},
output: []telegraf.Metric{
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Initialized",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "running",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Ready",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "running",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(1),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Scheduled",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "running",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
@ -220,6 +271,7 @@ func TestPod(t *testing.T) {
"container_name": "running",
"node_name": "node1",
"pod_name": "pod1",
"image": "image1",
"phase": "Running",
"state": "running",
"readiness": "ready",
@ -234,11 +286,63 @@ func TestPod(t *testing.T) {
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Initialized",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "completed",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Ready",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "completed",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(1),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Scheduled",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "completed",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"namespace": "ns1",
"container_name": "completed",
"image": "image1",
"node_name": "node1",
"pod_name": "pod1",
"phase": "Running",
@ -257,12 +361,64 @@ func TestPod(t *testing.T) {
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Initialized",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "waiting",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Ready",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "waiting",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(1),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Scheduled",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "waiting",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"namespace": "ns1",
"container_name": "waiting",
"node_name": "node1",
"image": "image1",
"pod_name": "pod1",
"phase": "Running",
"state": "waiting",
@ -676,6 +832,57 @@ func TestPodPendingContainers(t *testing.T) {
},
},
output: []telegraf.Metric{
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Initialized",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "waiting",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Ready",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "waiting",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(1),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Scheduled",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "waiting",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
@ -683,6 +890,7 @@ func TestPodPendingContainers(t *testing.T) {
"container_name": "waiting",
"node_name": "node1",
"pod_name": "pod1",
"image": "image1",
"phase": "Pending",
"state": "unknown",
"readiness": "unready",
@ -698,6 +906,57 @@ func TestPodPendingContainers(t *testing.T) {
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Initialized",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "terminated",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Ready",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "terminated",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(1),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
"pod_name": "pod1",
"condition": "Scheduled",
"status": "True",
"image": "image1",
"node_name": "node1",
"namespace": "ns1",
"container_name": "terminated",
},
map[string]interface{}{
"status_condition": int64(1),
"ready": int64(0),
},
time.Unix(0, 0),
),
testutil.MustMetric(
podContainerMeasurement,
map[string]string{
@ -705,6 +964,7 @@ func TestPodPendingContainers(t *testing.T) {
"container_name": "terminated",
"node_name": "node1",
"pod_name": "pod1",
"image": "image1",
"phase": "Pending",
"state": "unknown",
"readiness": "unready",

View File

@ -0,0 +1,77 @@
package kube_inventory
import (
"context"
"strings"
corev1 "k8s.io/api/core/v1"
"github.com/influxdata/telegraf"
)
func collectResourceQuotas(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getResourceQuotas(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, i := range list.Items {
ki.gatherResourceQuota(i, acc)
}
}
func (ki *KubernetesInventory) gatherResourceQuota(r corev1.ResourceQuota, acc telegraf.Accumulator) {
fields := map[string]interface{}{}
tags := map[string]string{
"resource": r.Name,
"namespace": r.Namespace,
}
for resourceName, val := range r.Status.Hard {
switch resourceName {
case "cpu", "limits.cpu", "requests.cpu":
key := "hard_cpu"
if strings.Contains(string(resourceName), "limits") {
key = key + "_limits"
} else if strings.Contains(string(resourceName), "requests") {
key = key + "_requests"
}
fields[key] = ki.convertQuantity(val.String(), 1)
case "memory", "limits.memory", "requests.memory":
key := "hard_memory"
if strings.Contains(string(resourceName), "limits") {
key = key + "_limits"
} else if strings.Contains(string(resourceName), "requests") {
key = key + "_requests"
}
fields[key] = ki.convertQuantity(val.String(), 1)
case "pods":
fields["hard_pods"] = atoi(val.String())
}
}
for resourceName, val := range r.Status.Used {
switch resourceName {
case "cpu", "requests.cpu", "limits.cpu":
key := "used_cpu"
if strings.Contains(string(resourceName), "limits") {
key = key + "_limits"
} else if strings.Contains(string(resourceName), "requests") {
key = key + "_requests"
}
fields[key] = ki.convertQuantity(val.String(), 1)
case "memory", "requests.memory", "limits.memory":
key := "used_memory"
if strings.Contains(string(resourceName), "limits") {
key = key + "_limits"
} else if strings.Contains(string(resourceName), "requests") {
key = key + "_requests"
}
fields[key] = ki.convertQuantity(val.String(), 1)
case "pods":
fields["used_pods"] = atoi(val.String())
}
}
acc.AddFields(resourcequotaMeasurement, fields, tags)
}

View File

@ -0,0 +1,113 @@
package kube_inventory
import (
"testing"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestResourceQuota(t *testing.T) {
cli := &client{}
now := time.Now()
tests := []struct {
name string
handler *mockHandler
output []telegraf.Metric
hasError bool
}{
{
name: "no ressourcequota",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/resourcequotas/": corev1.ResourceQuotaList{},
},
},
output: []telegraf.Metric{},
hasError: false,
},
{
name: "collect resourceqota",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/resourcequotas/": corev1.ResourceQuotaList{
Items: []corev1.ResourceQuota{
{
Status: corev1.ResourceQuotaStatus{
Hard: corev1.ResourceList{
"cpu": resource.MustParse("16"),
"memory": resource.MustParse("125817904Ki"),
"pods": resource.MustParse("110"),
},
Used: corev1.ResourceList{
"cpu": resource.MustParse("10"),
"memory": resource.MustParse("125715504Ki"),
"pods": resource.MustParse("0"),
},
},
ObjectMeta: metav1.ObjectMeta{
Generation: 11232,
Namespace: "ns1",
Name: "rq1",
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: metav1.Time{Time: now},
},
},
},
},
},
},
output: []telegraf.Metric{
testutil.MustMetric(
resourcequotaMeasurement,
map[string]string{
"resource": "rq1",
"namespace": "ns1",
},
map[string]interface{}{
"hard_cpu": int64(16),
"hard_memory": int64(1.28837533696e+11),
"hard_pods": int64(110),
"used_cpu": int64(10),
"used_memory": int64(1.28732676096e+11),
"used_pods": int64(0),
},
time.Unix(0, 0),
),
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, quota := range ((v.handler.responseMap["/resourcequotas/"]).(corev1.ResourceQuotaList)).Items {
ks.gatherResourceQuota(quota, acc)
}
err := acc.FirstError()
if v.hasError {
require.Errorf(t, err, "%s failed, should have error", v.name)
continue
}
// No error case
require.NoErrorf(t, err, "%s failed, err: %v", v.name, err)
require.Len(t, acc.Metrics, len(v.output))
testutil.RequireMetricsEqual(t, acc.GetTelegrafMetrics(), v.output, testutil.IgnoreTime())
}
}

View File

@ -221,16 +221,14 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator)
acc.AddFields("kubernetes_node", fields, tags)
}
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) {
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Item, error) {
var podAPI Pods
err := k.LoadJSON(fmt.Sprintf("%s/pods", baseURL), &podAPI)
if err != nil {
return nil, err
}
podInfos := make([]Metadata, 0, len(podAPI.Items))
for _, podMetadata := range podAPI.Items {
podInfos = append(podInfos, podMetadata.Metadata)
}
podInfos := make([]Item, 0, len(podAPI.Items))
podInfos = append(podInfos, podAPI.Items...)
return podInfos, nil
}
@ -286,12 +284,16 @@ func (k *Kubernetes) LoadJSON(url string, v interface{}) error {
return nil
}
func buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) {
func buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []Item, labelFilter filter.Filter, acc telegraf.Accumulator) {
for _, pod := range summaryMetrics.Pods {
podLabels := make(map[string]string)
containerImages := make(map[string]string)
for _, info := range podInfo {
if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace {
for k, v := range info.Labels {
if info.Metadata.Name == pod.PodRef.Name && info.Metadata.Namespace == pod.PodRef.Namespace {
for _, v := range info.Spec.Containers {
containerImages[v.Name] = v.Image
}
for k, v := range info.Metadata.Labels {
if labelFilter.Match(k) {
podLabels[k] = v
}
@ -306,6 +308,15 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFi
"container_name": container.Name,
"pod_name": pod.PodRef.Name,
}
for k, v := range containerImages {
if k == container.Name {
tags["image"] = v
tok := strings.Split(v, ":")
if len(tok) == 2 {
tags["version"] = tok[1]
}
}
}
for k, v := range podLabels {
tags[k] = v
}

View File

@ -8,6 +8,7 @@ type Pods struct {
type Item struct {
Metadata Metadata `json:"metadata"`
Spec Spec `json:"spec"`
}
type Metadata struct {
@ -15,3 +16,12 @@ type Metadata struct {
Namespace string `json:"namespace"`
Labels map[string]string `json:"labels"`
}
type Spec struct {
Containers []Container `json:"containers"`
}
type Container struct {
Name string `json:"name"`
Image string `json:"image"`
}