From d9b335e814f13c904825ac8cba13eb921b46c03a Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 2 Oct 2023 19:04:06 +0200 Subject: [PATCH] fix(inputs.openstack): Handle dependencies between enabled services and available endpoints (#14011) --- plugins/inputs/openstack/openstack.go | 411 ++++++++++++++------------ 1 file changed, 227 insertions(+), 184 deletions(-) diff --git a/plugins/inputs/openstack/openstack.go b/plugins/inputs/openstack/openstack.go index d4818f747..80306fecf 100644 --- a/plugins/inputs/openstack/openstack.go +++ b/plugins/inputs/openstack/openstack.go @@ -41,7 +41,6 @@ import ( "github.com/gophercloud/gophercloud/openstack/orchestration/v1/stacks" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/choice" httpconfig "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -89,20 +88,10 @@ type OpenStack struct { // Locally cached resources openstackFlavors map[string]flavors.Flavor openstackHypervisors []hypervisors.Hypervisor - diag map[string]interface{} openstackProjects map[string]projects.Project openstackServices map[string]services.Service -} -// containsService indicates whether a particular service is enabled -func (o *OpenStack) containsService(t string) bool { - for _, service := range o.openstackServices { - if service.Type == t { - return true - } - } - - return false + services map[string]bool } // convertTimeFormat, to convert time format based on HumanReadableTS @@ -122,29 +111,37 @@ func (o *OpenStack) Init() error { if len(o.EnabledServices) == 0 { o.EnabledServices = []string{"services", "projects", "hypervisors", "flavors", "networks", "volumes"} } + sort.Strings(o.EnabledServices) if o.Username == "" || o.Password == "" { return fmt.Errorf("username or password can not be empty string") } if o.TagValue == "" { return fmt.Errorf("tag_value option can not be empty string") } - sort.Strings(o.EnabledServices) + + // Check the enabled services + o.services = make(map[string]bool, len(o.EnabledServices)) + for _, service := range o.EnabledServices { + switch service { + case "agents", "aggregates", "cinder_services", "flavors", "hypervisors", + "networks", "nova_services", "ports", "projects", "servers", "services", + "stacks", "storage_pools", "subnets", "volumes": + o.services[service] = true + default: + return fmt.Errorf("invalid service %q", service) + } + } + return nil +} + +func (o *OpenStack) Start(_ telegraf.Accumulator) error { o.openstackFlavors = map[string]flavors.Flavor{} o.openstackHypervisors = []hypervisors.Hypervisor{} - o.diag = map[string]interface{}{} o.openstackProjects = map[string]projects.Project{} o.openstackServices = map[string]services.Service{} // Authenticate against Keystone and get a token provider - authOption := gophercloud.AuthOptions{ - IdentityEndpoint: o.IdentityEndpoint, - DomainName: o.Domain, - TenantName: o.Project, - Username: o.Username, - Password: o.Password, - AllowReauth: true, - } - provider, err := openstack.NewClient(authOption.IdentityEndpoint) + provider, err := openstack.NewClient(o.IdentityEndpoint) if err != nil { return fmt.Errorf("unable to create client for OpenStack endpoint: %w", err) } @@ -157,77 +154,145 @@ func (o *OpenStack) Init() error { provider.HTTPClient = *client + // Authenticate to the endpoint + authOption := gophercloud.AuthOptions{ + IdentityEndpoint: o.IdentityEndpoint, + DomainName: o.Domain, + TenantName: o.Project, + Username: o.Username, + Password: o.Password, + AllowReauth: true, + } if err := openstack.Authenticate(provider, authOption); err != nil { return fmt.Errorf("unable to authenticate OpenStack user: %w", err) } // Create required clients and attach to the OpenStack struct - if o.identity, err = openstack.NewIdentityV3(provider, gophercloud.EndpointOpts{}); err != nil { + o.identity, err = openstack.NewIdentityV3(provider, gophercloud.EndpointOpts{}) + if err != nil { return fmt.Errorf("unable to create V3 identity client: %w", err) } - - if err := o.gatherServices(); err != nil { - return fmt.Errorf("failed to get resource openstack services: %w", err) - } - - if o.compute, err = openstack.NewComputeV2(provider, gophercloud.EndpointOpts{}); err != nil { + o.compute, err = openstack.NewComputeV2(provider, gophercloud.EndpointOpts{}) + if err != nil { return fmt.Errorf("unable to create V2 compute client: %w", err) } - - // Create required clients and attach to the OpenStack struct - if o.network, err = openstack.NewNetworkV2(provider, gophercloud.EndpointOpts{}); err != nil { + o.network, err = openstack.NewNetworkV2(provider, gophercloud.EndpointOpts{}) + if err != nil { return fmt.Errorf("unable to create V2 network client: %w", err) } - // The Orchestration service is optional - if o.containsService("orchestration") { - if o.stack, err = openstack.NewOrchestrationV1(provider, gophercloud.EndpointOpts{}); err != nil { - return fmt.Errorf("unable to create V1 stack client: %w", err) + // Determine the services available at the endpoint + if err := o.availableServices(); err != nil { + return fmt.Errorf("failed to get resource openstack services: %w", err) + } + + // Setup the optional services + var hasOrchestration bool + var hasBlockStorage bool + for _, available := range o.openstackServices { + switch available.Type { + case "orchestration": + o.stack, err = openstack.NewOrchestrationV1(provider, gophercloud.EndpointOpts{}) + if err != nil { + return fmt.Errorf("unable to create V1 stack client: %w", err) + } + hasOrchestration = true + case "volumev3": + o.volume, err = openstack.NewBlockStorageV3(provider, gophercloud.EndpointOpts{}) + if err != nil { + return fmt.Errorf("unable to create V3 volume client: %w", err) + } } } - // The Cinder volume storage service is optional - if o.containsService("volumev3") { - if o.volume, err = openstack.NewBlockStorageV3(provider, gophercloud.EndpointOpts{}); err != nil { - return fmt.Errorf("unable to create V3 volume client: %w", err) + // Check if we need to disable services that are enabled by the user + if !hasOrchestration { + if o.services["stacks"] { + o.Log.Warn("Disabling \"stacks\" service because orchestration is not available at the endpoint!") + delete(o.services, "stacks") + } + } + if !hasBlockStorage { + for _, s := range []string{"cinder_services", "storage_pools", "volumes"} { + if o.services[s] { + o.Log.Warnf("Disabling %q service because block-storage is not available at the endpoint!", s) + delete(o.services, s) + } } } return nil } +func (o *OpenStack) Stop() {} + // Gather gathers resources from the OpenStack API and accumulates metrics. This // implements the Input interface. func (o *OpenStack) Gather(acc telegraf.Accumulator) error { - // Gather resources. Note service harvesting must come first as the other - // gatherers are dependant on this information. - gatherers := map[string]func(telegraf.Accumulator) error{ - "projects": o.gatherProjects, - "hypervisors": o.gatherHypervisors, - "flavors": o.gatherFlavors, - "servers": o.gatherServers, - "volumes": o.gatherVolumes, - "storage_pools": o.gatherStoragePools, - "subnets": o.gatherSubnets, - "ports": o.gatherPorts, - "networks": o.gatherNetworks, - "aggregates": o.gatherAggregates, - "nova_services": o.gatherNovaServices, - "cinder_services": o.gatherCinderServices, - "agents": o.gatherAgents, - "stacks": o.gatherStacks, + callDuration := make(map[string]interface{}, len(o.services)) + + // Prepare the shared resources + if o.services["hypervisors"] || o.services["servers"] || o.ServerDiagnotics { + start := time.Now() + if err := o.gatherHypervisors(); err != nil { + acc.AddError(fmt.Errorf("failed to get resource \"hypervisors\": %w", err)) + } + if o.services["hypervisors"] { + callDuration["hypervisors"] = time.Since(start).Nanoseconds() + } + } + // Servers were already queried, so use this information + if o.services["servers"] || o.ServerDiagnotics { + start := time.Now() + if err := o.gatherServers(acc); err != nil { + return fmt.Errorf("failed to get resource \"servers\": %w", err) + } + callDuration["servers"] = time.Since(start).Nanoseconds() } - callDuration := map[string]interface{}{} - for _, service := range o.EnabledServices { - // As Services are already gathered in Init(), using this to accumulate them. - if service == "services" { + for service := range o.services { + var err error + + start := time.Now() + switch service { + case "services": + // As Services are already gathered in Init(), using this to accumulate them. o.accumulateServices(acc) continue + case "projects": + err = o.gatherProjects(acc) + case "hypervisors": + // Gathered as part of the shared resource + o.accumulateHypervisor(acc) + continue + case "flavors": + err = o.gatherFlavors(acc) + case "servers": + // Gathered as part of the shared resource + case "volumes": + err = o.gatherVolumes(acc) + case "storage_pools": + err = o.gatherStoragePools(acc) + case "subnets": + err = o.gatherSubnets(acc) + case "ports": + err = o.gatherPorts(acc) + case "networks": + err = o.gatherNetworks(acc) + case "aggregates": + err = o.gatherAggregates(acc) + case "nova_services": + err = o.gatherNovaServices(acc) + case "cinder_services": + err = o.gatherCinderServices(acc) + case "agents": + err = o.gatherAgents(acc) + case "stacks": + err = o.gatherStacks(acc) + default: + return fmt.Errorf("invalid service %q", service) } - start := time.Now() - gatherer := gatherers[service] - if err := gatherer(acc); err != nil { + if err != nil { acc.AddError(fmt.Errorf("failed to get resource %q: %w", service, err)) } callDuration[service] = time.Since(start).Nanoseconds() @@ -239,22 +304,12 @@ func (o *OpenStack) Gather(acc telegraf.Accumulator) error { } } - if o.ServerDiagnotics { - if !choice.Contains("servers", o.EnabledServices) { - if err := o.gatherServers(acc); err != nil { - acc.AddError(fmt.Errorf("failed to get resource server diagnostics: %w", err)) - return nil - } - } - o.accumulateServerDiagnostics(acc) - } - return nil } -// gatherServices collects services from the OpenStack API. -func (o *OpenStack) gatherServices() error { - page, err := services.List(o.identity, &services.ListOpts{}).AllPages() +// availableServices collects the available endpoint services via API +func (o *OpenStack) availableServices() error { + page, err := services.List(o.identity, nil).AllPages() if err != nil { return fmt.Errorf("unable to list services: %w", err) } @@ -592,7 +647,7 @@ func (o *OpenStack) gatherProjects(acc telegraf.Accumulator) error { } // gatherHypervisors collects and accumulates hypervisors data from the OpenStack API. -func (o *OpenStack) gatherHypervisors(acc telegraf.Accumulator) error { +func (o *OpenStack) gatherHypervisors() error { page, err := hypervisors.List(o.compute, hypervisors.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list hypervisors: %w", err) @@ -602,45 +657,7 @@ func (o *OpenStack) gatherHypervisors(acc telegraf.Accumulator) error { return fmt.Errorf("unable to extract hypervisors: %w", err) } o.openstackHypervisors = extractedHypervisors - if choice.Contains("hypervisors", o.EnabledServices) { - for _, hypervisor := range extractedHypervisors { - tags := map[string]string{ - "cpu_vendor": hypervisor.CPUInfo.Vendor, - "cpu_arch": hypervisor.CPUInfo.Arch, - "cpu_model": hypervisor.CPUInfo.Model, - "status": strings.ToLower(hypervisor.Status), - "state": hypervisor.State, - "hypervisor_hostname": hypervisor.HypervisorHostname, - "hypervisor_type": hypervisor.HypervisorType, - "hypervisor_version": strconv.Itoa(hypervisor.HypervisorVersion), - "service_host": hypervisor.Service.Host, - "service_id": hypervisor.Service.ID, - "service_disabled_reason": hypervisor.Service.DisabledReason, - } - for _, cpuFeature := range hypervisor.CPUInfo.Features { - tags["cpu_feature_"+cpuFeature] = "true" - } - fields := map[string]interface{}{ - "id": hypervisor.ID, - "host_ip": hypervisor.HostIP, - "cpu_topology_sockets": hypervisor.CPUInfo.Topology.Sockets, - "cpu_topology_cores": hypervisor.CPUInfo.Topology.Cores, - "cpu_topology_threads": hypervisor.CPUInfo.Topology.Threads, - "current_workload": hypervisor.CurrentWorkload, - "disk_available_least": hypervisor.DiskAvailableLeast, - "free_disk_gb": hypervisor.FreeDiskGB, - "free_ram_mb": hypervisor.FreeRamMB, - "local_gb": hypervisor.LocalGB, - "local_gb_used": hypervisor.LocalGBUsed, - "memory_mb": hypervisor.MemoryMB, - "memory_mb_used": hypervisor.MemoryMBUsed, - "running_vms": hypervisor.RunningVMs, - "vcpus": hypervisor.VCPUs, - "vcpus_used": hypervisor.VCPUsUsed, - } - acc.AddFields("openstack_hypervisor", fields, tags) - } - } + return nil } @@ -758,12 +775,6 @@ func (o *OpenStack) gatherStoragePools(acc telegraf.Accumulator) error { // gatherServers collects servers from the OpenStack API. func (o *OpenStack) gatherServers(acc telegraf.Accumulator) error { - if !choice.Contains("hypervisors", o.EnabledServices) { - if err := o.gatherHypervisors(acc); err != nil { - acc.AddError(fmt.Errorf("failed to get resource hypervisors: %w", err)) - } - } - serverGather := choice.Contains("servers", o.EnabledServices) for _, hypervisor := range o.openstackHypervisors { page, err := servers.List(o.compute, &servers.ListOpts{AllTenants: true, Host: hypervisor.HypervisorHostname}).AllPages() if err != nil { @@ -774,23 +785,62 @@ func (o *OpenStack) gatherServers(acc telegraf.Accumulator) error { return fmt.Errorf("unable to extract servers: %w", err) } for _, server := range extractedServers { - if serverGather { + if o.services["server"] { o.accumulateServer(acc, server, hypervisor.HypervisorHostname) } - if !o.ServerDiagnotics || server.Status != "ACTIVE" { - continue + if o.ServerDiagnotics && server.Status == "ACTIVE" { + diagnostic, err := diagnostics.Get(o.compute, server.ID).Extract() + if err != nil { + acc.AddError(fmt.Errorf("unable to get diagnostics for server %q: %w", server.ID, err)) + continue + } + o.accumulateServerDiagnostics(acc, hypervisor.HypervisorHostname, server.ID, diagnostic) } - diagnostic, err := diagnostics.Get(o.compute, server.ID).Extract() - if err != nil { - acc.AddError(fmt.Errorf("unable to get diagnostics for server %q: %w", server.ID, err)) - continue - } - o.diag[server.ID] = diagnostic } } return nil } +func (o *OpenStack) accumulateHypervisor(acc telegraf.Accumulator) { + for _, hypervisor := range o.openstackHypervisors { + tags := map[string]string{ + "cpu_vendor": hypervisor.CPUInfo.Vendor, + "cpu_arch": hypervisor.CPUInfo.Arch, + "cpu_model": hypervisor.CPUInfo.Model, + "status": strings.ToLower(hypervisor.Status), + "state": hypervisor.State, + "hypervisor_hostname": hypervisor.HypervisorHostname, + "hypervisor_type": hypervisor.HypervisorType, + "hypervisor_version": strconv.Itoa(hypervisor.HypervisorVersion), + "service_host": hypervisor.Service.Host, + "service_id": hypervisor.Service.ID, + "service_disabled_reason": hypervisor.Service.DisabledReason, + } + for _, cpuFeature := range hypervisor.CPUInfo.Features { + tags["cpu_feature_"+cpuFeature] = "true" + } + fields := map[string]interface{}{ + "id": hypervisor.ID, + "host_ip": hypervisor.HostIP, + "cpu_topology_sockets": hypervisor.CPUInfo.Topology.Sockets, + "cpu_topology_cores": hypervisor.CPUInfo.Topology.Cores, + "cpu_topology_threads": hypervisor.CPUInfo.Topology.Threads, + "current_workload": hypervisor.CurrentWorkload, + "disk_available_least": hypervisor.DiskAvailableLeast, + "free_disk_gb": hypervisor.FreeDiskGB, + "free_ram_mb": hypervisor.FreeRamMB, + "local_gb": hypervisor.LocalGB, + "local_gb_used": hypervisor.LocalGBUsed, + "memory_mb": hypervisor.MemoryMB, + "memory_mb_used": hypervisor.MemoryMBUsed, + "running_vms": hypervisor.RunningVMs, + "vcpus": hypervisor.VCPUs, + "vcpus_used": hypervisor.VCPUsUsed, + } + acc.AddFields("openstack_hypervisor", fields, tags) + } +} + // accumulateServices accumulates statistics of services. func (o *OpenStack) accumulateServices(acc telegraf.Accumulator) { for _, service := range o.openstackServices { @@ -871,59 +921,52 @@ func (o *OpenStack) accumulateServer(acc telegraf.Accumulator, server servers.Se // accumulateServerDiagnostics accumulates statistics from the compute(nova) service. // currently only supports 'libvirt' driver. -func (o *OpenStack) accumulateServerDiagnostics(acc telegraf.Accumulator) { - for serverID, diagnostic := range o.diag { - s, ok := diagnostic.(map[string]interface{}) - if !ok { - o.Log.Warnf("unknown type for diagnostics %T", diagnostic) - continue - } - tags := map[string]string{ - "server_id": serverID, - } - fields := map[string]interface{}{} - portName := make(map[string]bool) - storageName := make(map[string]bool) - memoryStats := make(map[string]interface{}) - for k, v := range s { - if typePort.MatchString(k) { - portName[strings.Split(k, "_")[0]] = true - } else if typeCPU.MatchString(k) { - fields[k] = v - } else if typeStorage.MatchString(k) { - storageName[strings.Split(k, "_")[0]] = true - } else { - memoryStats[k] = v - } - } - fields["memory"] = memoryStats["memory"] - fields["memory-actual"] = memoryStats["memory-actual"] - fields["memory-rss"] = memoryStats["memory-rss"] - fields["memory-swap_in"] = memoryStats["memory-swap_in"] - tags["no_of_ports"] = strconv.Itoa(len(portName)) - tags["no_of_disks"] = strconv.Itoa(len(storageName)) - for key := range storageName { - fields["disk_errors"] = s[key+"_errors"] - fields["disk_read"] = s[key+"_read"] - fields["disk_read_req"] = s[key+"_read_req"] - fields["disk_write"] = s[key+"_write"] - fields["disk_write_req"] = s[key+"_write_req"] - tags["disk_name"] = key - acc.AddFields("openstack_server_diagnostics", fields, tags) - } - for key := range portName { - fields["port_rx"] = s[key+"_rx"] - fields["port_rx_drop"] = s[key+"_rx_drop"] - fields["port_rx_errors"] = s[key+"_rx_errors"] - fields["port_rx_packets"] = s[key+"_rx_packets"] - fields["port_tx"] = s[key+"_tx"] - fields["port_tx_drop"] = s[key+"_tx_drop"] - fields["port_tx_errors"] = s[key+"_tx_errors"] - fields["port_tx_packets"] = s[key+"_tx_packets"] - tags["port_name"] = key - acc.AddFields("openstack_server_diagnostics", fields, tags) +func (o *OpenStack) accumulateServerDiagnostics(acc telegraf.Accumulator, _, serverID string, diagnostic map[string]interface{}) { + tags := map[string]string{ + "server_id": serverID, + } + fields := map[string]interface{}{} + portName := make(map[string]bool) + storageName := make(map[string]bool) + memoryStats := make(map[string]interface{}) + for k, v := range diagnostic { + if typePort.MatchString(k) { + portName[strings.Split(k, "_")[0]] = true + } else if typeCPU.MatchString(k) { + fields[k] = v + } else if typeStorage.MatchString(k) { + storageName[strings.Split(k, "_")[0]] = true + } else { + memoryStats[k] = v } } + fields["memory"] = memoryStats["memory"] + fields["memory-actual"] = memoryStats["memory-actual"] + fields["memory-rss"] = memoryStats["memory-rss"] + fields["memory-swap_in"] = memoryStats["memory-swap_in"] + tags["no_of_ports"] = strconv.Itoa(len(portName)) + tags["no_of_disks"] = strconv.Itoa(len(storageName)) + for key := range storageName { + fields["disk_errors"] = diagnostic[key+"_errors"] + fields["disk_read"] = diagnostic[key+"_read"] + fields["disk_read_req"] = diagnostic[key+"_read_req"] + fields["disk_write"] = diagnostic[key+"_write"] + fields["disk_write_req"] = diagnostic[key+"_write_req"] + tags["disk_name"] = key + acc.AddFields("openstack_server_diagnostics", fields, tags) + } + for key := range portName { + fields["port_rx"] = diagnostic[key+"_rx"] + fields["port_rx_drop"] = diagnostic[key+"_rx_drop"] + fields["port_rx_errors"] = diagnostic[key+"_rx_errors"] + fields["port_rx_packets"] = diagnostic[key+"_rx_packets"] + fields["port_tx"] = diagnostic[key+"_tx"] + fields["port_tx_drop"] = diagnostic[key+"_tx_drop"] + fields["port_tx_errors"] = diagnostic[key+"_tx_errors"] + fields["port_tx_packets"] = diagnostic[key+"_tx_packets"] + tags["port_name"] = key + acc.AddFields("openstack_server_diagnostics", fields, tags) + } } // init registers a callback which creates a new OpenStack input instance.