fix(inputs.openstack): Handle dependencies between enabled services and available endpoints (#14011)

This commit is contained in:
Sven Rebhan 2023-10-02 19:04:06 +02:00 committed by GitHub
parent 904c95a94b
commit d9b335e814
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 227 additions and 184 deletions

View File

@ -41,7 +41,6 @@ import (
"github.com/gophercloud/gophercloud/openstack/orchestration/v1/stacks"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -89,20 +88,10 @@ type OpenStack struct {
// Locally cached resources
openstackFlavors map[string]flavors.Flavor
openstackHypervisors []hypervisors.Hypervisor
diag map[string]interface{}
openstackProjects map[string]projects.Project
openstackServices map[string]services.Service
}
// containsService indicates whether a particular service is enabled
func (o *OpenStack) containsService(t string) bool {
for _, service := range o.openstackServices {
if service.Type == t {
return true
}
}
return false
services map[string]bool
}
// convertTimeFormat, to convert time format based on HumanReadableTS
@ -122,29 +111,37 @@ func (o *OpenStack) Init() error {
if len(o.EnabledServices) == 0 {
o.EnabledServices = []string{"services", "projects", "hypervisors", "flavors", "networks", "volumes"}
}
sort.Strings(o.EnabledServices)
if o.Username == "" || o.Password == "" {
return fmt.Errorf("username or password can not be empty string")
}
if o.TagValue == "" {
return fmt.Errorf("tag_value option can not be empty string")
}
sort.Strings(o.EnabledServices)
// Check the enabled services
o.services = make(map[string]bool, len(o.EnabledServices))
for _, service := range o.EnabledServices {
switch service {
case "agents", "aggregates", "cinder_services", "flavors", "hypervisors",
"networks", "nova_services", "ports", "projects", "servers", "services",
"stacks", "storage_pools", "subnets", "volumes":
o.services[service] = true
default:
return fmt.Errorf("invalid service %q", service)
}
}
return nil
}
func (o *OpenStack) Start(_ telegraf.Accumulator) error {
o.openstackFlavors = map[string]flavors.Flavor{}
o.openstackHypervisors = []hypervisors.Hypervisor{}
o.diag = map[string]interface{}{}
o.openstackProjects = map[string]projects.Project{}
o.openstackServices = map[string]services.Service{}
// Authenticate against Keystone and get a token provider
authOption := gophercloud.AuthOptions{
IdentityEndpoint: o.IdentityEndpoint,
DomainName: o.Domain,
TenantName: o.Project,
Username: o.Username,
Password: o.Password,
AllowReauth: true,
}
provider, err := openstack.NewClient(authOption.IdentityEndpoint)
provider, err := openstack.NewClient(o.IdentityEndpoint)
if err != nil {
return fmt.Errorf("unable to create client for OpenStack endpoint: %w", err)
}
@ -157,77 +154,145 @@ func (o *OpenStack) Init() error {
provider.HTTPClient = *client
// Authenticate to the endpoint
authOption := gophercloud.AuthOptions{
IdentityEndpoint: o.IdentityEndpoint,
DomainName: o.Domain,
TenantName: o.Project,
Username: o.Username,
Password: o.Password,
AllowReauth: true,
}
if err := openstack.Authenticate(provider, authOption); err != nil {
return fmt.Errorf("unable to authenticate OpenStack user: %w", err)
}
// Create required clients and attach to the OpenStack struct
if o.identity, err = openstack.NewIdentityV3(provider, gophercloud.EndpointOpts{}); err != nil {
o.identity, err = openstack.NewIdentityV3(provider, gophercloud.EndpointOpts{})
if err != nil {
return fmt.Errorf("unable to create V3 identity client: %w", err)
}
if err := o.gatherServices(); err != nil {
return fmt.Errorf("failed to get resource openstack services: %w", err)
}
if o.compute, err = openstack.NewComputeV2(provider, gophercloud.EndpointOpts{}); err != nil {
o.compute, err = openstack.NewComputeV2(provider, gophercloud.EndpointOpts{})
if err != nil {
return fmt.Errorf("unable to create V2 compute client: %w", err)
}
// Create required clients and attach to the OpenStack struct
if o.network, err = openstack.NewNetworkV2(provider, gophercloud.EndpointOpts{}); err != nil {
o.network, err = openstack.NewNetworkV2(provider, gophercloud.EndpointOpts{})
if err != nil {
return fmt.Errorf("unable to create V2 network client: %w", err)
}
// The Orchestration service is optional
if o.containsService("orchestration") {
if o.stack, err = openstack.NewOrchestrationV1(provider, gophercloud.EndpointOpts{}); err != nil {
return fmt.Errorf("unable to create V1 stack client: %w", err)
// Determine the services available at the endpoint
if err := o.availableServices(); err != nil {
return fmt.Errorf("failed to get resource openstack services: %w", err)
}
// Setup the optional services
var hasOrchestration bool
var hasBlockStorage bool
for _, available := range o.openstackServices {
switch available.Type {
case "orchestration":
o.stack, err = openstack.NewOrchestrationV1(provider, gophercloud.EndpointOpts{})
if err != nil {
return fmt.Errorf("unable to create V1 stack client: %w", err)
}
hasOrchestration = true
case "volumev3":
o.volume, err = openstack.NewBlockStorageV3(provider, gophercloud.EndpointOpts{})
if err != nil {
return fmt.Errorf("unable to create V3 volume client: %w", err)
}
}
}
// The Cinder volume storage service is optional
if o.containsService("volumev3") {
if o.volume, err = openstack.NewBlockStorageV3(provider, gophercloud.EndpointOpts{}); err != nil {
return fmt.Errorf("unable to create V3 volume client: %w", err)
// Check if we need to disable services that are enabled by the user
if !hasOrchestration {
if o.services["stacks"] {
o.Log.Warn("Disabling \"stacks\" service because orchestration is not available at the endpoint!")
delete(o.services, "stacks")
}
}
if !hasBlockStorage {
for _, s := range []string{"cinder_services", "storage_pools", "volumes"} {
if o.services[s] {
o.Log.Warnf("Disabling %q service because block-storage is not available at the endpoint!", s)
delete(o.services, s)
}
}
}
return nil
}
func (o *OpenStack) Stop() {}
// Gather gathers resources from the OpenStack API and accumulates metrics. This
// implements the Input interface.
func (o *OpenStack) Gather(acc telegraf.Accumulator) error {
// Gather resources. Note service harvesting must come first as the other
// gatherers are dependant on this information.
gatherers := map[string]func(telegraf.Accumulator) error{
"projects": o.gatherProjects,
"hypervisors": o.gatherHypervisors,
"flavors": o.gatherFlavors,
"servers": o.gatherServers,
"volumes": o.gatherVolumes,
"storage_pools": o.gatherStoragePools,
"subnets": o.gatherSubnets,
"ports": o.gatherPorts,
"networks": o.gatherNetworks,
"aggregates": o.gatherAggregates,
"nova_services": o.gatherNovaServices,
"cinder_services": o.gatherCinderServices,
"agents": o.gatherAgents,
"stacks": o.gatherStacks,
callDuration := make(map[string]interface{}, len(o.services))
// Prepare the shared resources
if o.services["hypervisors"] || o.services["servers"] || o.ServerDiagnotics {
start := time.Now()
if err := o.gatherHypervisors(); err != nil {
acc.AddError(fmt.Errorf("failed to get resource \"hypervisors\": %w", err))
}
if o.services["hypervisors"] {
callDuration["hypervisors"] = time.Since(start).Nanoseconds()
}
}
// Servers were already queried, so use this information
if o.services["servers"] || o.ServerDiagnotics {
start := time.Now()
if err := o.gatherServers(acc); err != nil {
return fmt.Errorf("failed to get resource \"servers\": %w", err)
}
callDuration["servers"] = time.Since(start).Nanoseconds()
}
callDuration := map[string]interface{}{}
for _, service := range o.EnabledServices {
// As Services are already gathered in Init(), using this to accumulate them.
if service == "services" {
for service := range o.services {
var err error
start := time.Now()
switch service {
case "services":
// As Services are already gathered in Init(), using this to accumulate them.
o.accumulateServices(acc)
continue
case "projects":
err = o.gatherProjects(acc)
case "hypervisors":
// Gathered as part of the shared resource
o.accumulateHypervisor(acc)
continue
case "flavors":
err = o.gatherFlavors(acc)
case "servers":
// Gathered as part of the shared resource
case "volumes":
err = o.gatherVolumes(acc)
case "storage_pools":
err = o.gatherStoragePools(acc)
case "subnets":
err = o.gatherSubnets(acc)
case "ports":
err = o.gatherPorts(acc)
case "networks":
err = o.gatherNetworks(acc)
case "aggregates":
err = o.gatherAggregates(acc)
case "nova_services":
err = o.gatherNovaServices(acc)
case "cinder_services":
err = o.gatherCinderServices(acc)
case "agents":
err = o.gatherAgents(acc)
case "stacks":
err = o.gatherStacks(acc)
default:
return fmt.Errorf("invalid service %q", service)
}
start := time.Now()
gatherer := gatherers[service]
if err := gatherer(acc); err != nil {
if err != nil {
acc.AddError(fmt.Errorf("failed to get resource %q: %w", service, err))
}
callDuration[service] = time.Since(start).Nanoseconds()
@ -239,22 +304,12 @@ func (o *OpenStack) Gather(acc telegraf.Accumulator) error {
}
}
if o.ServerDiagnotics {
if !choice.Contains("servers", o.EnabledServices) {
if err := o.gatherServers(acc); err != nil {
acc.AddError(fmt.Errorf("failed to get resource server diagnostics: %w", err))
return nil
}
}
o.accumulateServerDiagnostics(acc)
}
return nil
}
// gatherServices collects services from the OpenStack API.
func (o *OpenStack) gatherServices() error {
page, err := services.List(o.identity, &services.ListOpts{}).AllPages()
// availableServices collects the available endpoint services via API
func (o *OpenStack) availableServices() error {
page, err := services.List(o.identity, nil).AllPages()
if err != nil {
return fmt.Errorf("unable to list services: %w", err)
}
@ -592,7 +647,7 @@ func (o *OpenStack) gatherProjects(acc telegraf.Accumulator) error {
}
// gatherHypervisors collects and accumulates hypervisors data from the OpenStack API.
func (o *OpenStack) gatherHypervisors(acc telegraf.Accumulator) error {
func (o *OpenStack) gatherHypervisors() error {
page, err := hypervisors.List(o.compute, hypervisors.ListOpts{}).AllPages()
if err != nil {
return fmt.Errorf("unable to list hypervisors: %w", err)
@ -602,45 +657,7 @@ func (o *OpenStack) gatherHypervisors(acc telegraf.Accumulator) error {
return fmt.Errorf("unable to extract hypervisors: %w", err)
}
o.openstackHypervisors = extractedHypervisors
if choice.Contains("hypervisors", o.EnabledServices) {
for _, hypervisor := range extractedHypervisors {
tags := map[string]string{
"cpu_vendor": hypervisor.CPUInfo.Vendor,
"cpu_arch": hypervisor.CPUInfo.Arch,
"cpu_model": hypervisor.CPUInfo.Model,
"status": strings.ToLower(hypervisor.Status),
"state": hypervisor.State,
"hypervisor_hostname": hypervisor.HypervisorHostname,
"hypervisor_type": hypervisor.HypervisorType,
"hypervisor_version": strconv.Itoa(hypervisor.HypervisorVersion),
"service_host": hypervisor.Service.Host,
"service_id": hypervisor.Service.ID,
"service_disabled_reason": hypervisor.Service.DisabledReason,
}
for _, cpuFeature := range hypervisor.CPUInfo.Features {
tags["cpu_feature_"+cpuFeature] = "true"
}
fields := map[string]interface{}{
"id": hypervisor.ID,
"host_ip": hypervisor.HostIP,
"cpu_topology_sockets": hypervisor.CPUInfo.Topology.Sockets,
"cpu_topology_cores": hypervisor.CPUInfo.Topology.Cores,
"cpu_topology_threads": hypervisor.CPUInfo.Topology.Threads,
"current_workload": hypervisor.CurrentWorkload,
"disk_available_least": hypervisor.DiskAvailableLeast,
"free_disk_gb": hypervisor.FreeDiskGB,
"free_ram_mb": hypervisor.FreeRamMB,
"local_gb": hypervisor.LocalGB,
"local_gb_used": hypervisor.LocalGBUsed,
"memory_mb": hypervisor.MemoryMB,
"memory_mb_used": hypervisor.MemoryMBUsed,
"running_vms": hypervisor.RunningVMs,
"vcpus": hypervisor.VCPUs,
"vcpus_used": hypervisor.VCPUsUsed,
}
acc.AddFields("openstack_hypervisor", fields, tags)
}
}
return nil
}
@ -758,12 +775,6 @@ func (o *OpenStack) gatherStoragePools(acc telegraf.Accumulator) error {
// gatherServers collects servers from the OpenStack API.
func (o *OpenStack) gatherServers(acc telegraf.Accumulator) error {
if !choice.Contains("hypervisors", o.EnabledServices) {
if err := o.gatherHypervisors(acc); err != nil {
acc.AddError(fmt.Errorf("failed to get resource hypervisors: %w", err))
}
}
serverGather := choice.Contains("servers", o.EnabledServices)
for _, hypervisor := range o.openstackHypervisors {
page, err := servers.List(o.compute, &servers.ListOpts{AllTenants: true, Host: hypervisor.HypervisorHostname}).AllPages()
if err != nil {
@ -774,23 +785,62 @@ func (o *OpenStack) gatherServers(acc telegraf.Accumulator) error {
return fmt.Errorf("unable to extract servers: %w", err)
}
for _, server := range extractedServers {
if serverGather {
if o.services["server"] {
o.accumulateServer(acc, server, hypervisor.HypervisorHostname)
}
if !o.ServerDiagnotics || server.Status != "ACTIVE" {
continue
if o.ServerDiagnotics && server.Status == "ACTIVE" {
diagnostic, err := diagnostics.Get(o.compute, server.ID).Extract()
if err != nil {
acc.AddError(fmt.Errorf("unable to get diagnostics for server %q: %w", server.ID, err))
continue
}
o.accumulateServerDiagnostics(acc, hypervisor.HypervisorHostname, server.ID, diagnostic)
}
diagnostic, err := diagnostics.Get(o.compute, server.ID).Extract()
if err != nil {
acc.AddError(fmt.Errorf("unable to get diagnostics for server %q: %w", server.ID, err))
continue
}
o.diag[server.ID] = diagnostic
}
}
return nil
}
func (o *OpenStack) accumulateHypervisor(acc telegraf.Accumulator) {
for _, hypervisor := range o.openstackHypervisors {
tags := map[string]string{
"cpu_vendor": hypervisor.CPUInfo.Vendor,
"cpu_arch": hypervisor.CPUInfo.Arch,
"cpu_model": hypervisor.CPUInfo.Model,
"status": strings.ToLower(hypervisor.Status),
"state": hypervisor.State,
"hypervisor_hostname": hypervisor.HypervisorHostname,
"hypervisor_type": hypervisor.HypervisorType,
"hypervisor_version": strconv.Itoa(hypervisor.HypervisorVersion),
"service_host": hypervisor.Service.Host,
"service_id": hypervisor.Service.ID,
"service_disabled_reason": hypervisor.Service.DisabledReason,
}
for _, cpuFeature := range hypervisor.CPUInfo.Features {
tags["cpu_feature_"+cpuFeature] = "true"
}
fields := map[string]interface{}{
"id": hypervisor.ID,
"host_ip": hypervisor.HostIP,
"cpu_topology_sockets": hypervisor.CPUInfo.Topology.Sockets,
"cpu_topology_cores": hypervisor.CPUInfo.Topology.Cores,
"cpu_topology_threads": hypervisor.CPUInfo.Topology.Threads,
"current_workload": hypervisor.CurrentWorkload,
"disk_available_least": hypervisor.DiskAvailableLeast,
"free_disk_gb": hypervisor.FreeDiskGB,
"free_ram_mb": hypervisor.FreeRamMB,
"local_gb": hypervisor.LocalGB,
"local_gb_used": hypervisor.LocalGBUsed,
"memory_mb": hypervisor.MemoryMB,
"memory_mb_used": hypervisor.MemoryMBUsed,
"running_vms": hypervisor.RunningVMs,
"vcpus": hypervisor.VCPUs,
"vcpus_used": hypervisor.VCPUsUsed,
}
acc.AddFields("openstack_hypervisor", fields, tags)
}
}
// accumulateServices accumulates statistics of services.
func (o *OpenStack) accumulateServices(acc telegraf.Accumulator) {
for _, service := range o.openstackServices {
@ -871,59 +921,52 @@ func (o *OpenStack) accumulateServer(acc telegraf.Accumulator, server servers.Se
// accumulateServerDiagnostics accumulates statistics from the compute(nova) service.
// currently only supports 'libvirt' driver.
func (o *OpenStack) accumulateServerDiagnostics(acc telegraf.Accumulator) {
for serverID, diagnostic := range o.diag {
s, ok := diagnostic.(map[string]interface{})
if !ok {
o.Log.Warnf("unknown type for diagnostics %T", diagnostic)
continue
}
tags := map[string]string{
"server_id": serverID,
}
fields := map[string]interface{}{}
portName := make(map[string]bool)
storageName := make(map[string]bool)
memoryStats := make(map[string]interface{})
for k, v := range s {
if typePort.MatchString(k) {
portName[strings.Split(k, "_")[0]] = true
} else if typeCPU.MatchString(k) {
fields[k] = v
} else if typeStorage.MatchString(k) {
storageName[strings.Split(k, "_")[0]] = true
} else {
memoryStats[k] = v
}
}
fields["memory"] = memoryStats["memory"]
fields["memory-actual"] = memoryStats["memory-actual"]
fields["memory-rss"] = memoryStats["memory-rss"]
fields["memory-swap_in"] = memoryStats["memory-swap_in"]
tags["no_of_ports"] = strconv.Itoa(len(portName))
tags["no_of_disks"] = strconv.Itoa(len(storageName))
for key := range storageName {
fields["disk_errors"] = s[key+"_errors"]
fields["disk_read"] = s[key+"_read"]
fields["disk_read_req"] = s[key+"_read_req"]
fields["disk_write"] = s[key+"_write"]
fields["disk_write_req"] = s[key+"_write_req"]
tags["disk_name"] = key
acc.AddFields("openstack_server_diagnostics", fields, tags)
}
for key := range portName {
fields["port_rx"] = s[key+"_rx"]
fields["port_rx_drop"] = s[key+"_rx_drop"]
fields["port_rx_errors"] = s[key+"_rx_errors"]
fields["port_rx_packets"] = s[key+"_rx_packets"]
fields["port_tx"] = s[key+"_tx"]
fields["port_tx_drop"] = s[key+"_tx_drop"]
fields["port_tx_errors"] = s[key+"_tx_errors"]
fields["port_tx_packets"] = s[key+"_tx_packets"]
tags["port_name"] = key
acc.AddFields("openstack_server_diagnostics", fields, tags)
func (o *OpenStack) accumulateServerDiagnostics(acc telegraf.Accumulator, _, serverID string, diagnostic map[string]interface{}) {
tags := map[string]string{
"server_id": serverID,
}
fields := map[string]interface{}{}
portName := make(map[string]bool)
storageName := make(map[string]bool)
memoryStats := make(map[string]interface{})
for k, v := range diagnostic {
if typePort.MatchString(k) {
portName[strings.Split(k, "_")[0]] = true
} else if typeCPU.MatchString(k) {
fields[k] = v
} else if typeStorage.MatchString(k) {
storageName[strings.Split(k, "_")[0]] = true
} else {
memoryStats[k] = v
}
}
fields["memory"] = memoryStats["memory"]
fields["memory-actual"] = memoryStats["memory-actual"]
fields["memory-rss"] = memoryStats["memory-rss"]
fields["memory-swap_in"] = memoryStats["memory-swap_in"]
tags["no_of_ports"] = strconv.Itoa(len(portName))
tags["no_of_disks"] = strconv.Itoa(len(storageName))
for key := range storageName {
fields["disk_errors"] = diagnostic[key+"_errors"]
fields["disk_read"] = diagnostic[key+"_read"]
fields["disk_read_req"] = diagnostic[key+"_read_req"]
fields["disk_write"] = diagnostic[key+"_write"]
fields["disk_write_req"] = diagnostic[key+"_write_req"]
tags["disk_name"] = key
acc.AddFields("openstack_server_diagnostics", fields, tags)
}
for key := range portName {
fields["port_rx"] = diagnostic[key+"_rx"]
fields["port_rx_drop"] = diagnostic[key+"_rx_drop"]
fields["port_rx_errors"] = diagnostic[key+"_rx_errors"]
fields["port_rx_packets"] = diagnostic[key+"_rx_packets"]
fields["port_tx"] = diagnostic[key+"_tx"]
fields["port_tx_drop"] = diagnostic[key+"_tx_drop"]
fields["port_tx_errors"] = diagnostic[key+"_tx_errors"]
fields["port_tx_packets"] = diagnostic[key+"_tx_packets"]
tags["port_name"] = key
acc.AddFields("openstack_server_diagnostics", fields, tags)
}
}
// init registers a callback which creates a new OpenStack input instance.