chore(inputs.win_wmi): Cleanup and refactor code (#14965)

This commit is contained in:
Sven Rebhan 2024-03-12 15:45:26 +01:00 committed by GitHub
parent f7237170b9
commit 8183d4730c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 452 additions and 477 deletions

View File

@ -0,0 +1,158 @@
//go:build windows
package win_wmi
import (
"errors"
"fmt"
"runtime"
"strings"
"github.com/go-ole/go-ole"
"github.com/go-ole/go-ole/oleutil"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
)
// Query struct
type Query struct {
Namespace string `toml:"namespace"`
ClassName string `toml:"class_name"`
Properties []string `toml:"properties"`
Filter string `toml:"filter"`
TagPropertiesInclude []string `toml:"tag_properties"`
tagFilter filter.Filter
query string
}
func (q *Query) prepare() error {
// Compile the filter
f, err := filter.Compile(q.TagPropertiesInclude)
if err != nil {
return fmt.Errorf("compiling tag-filter failed: %w", err)
}
q.tagFilter = f
// Construct the overall query from the given parts
wql := fmt.Sprintf("SELECT %s FROM %s", strings.Join(q.Properties, ", "), q.ClassName)
if len(q.Filter) > 0 {
wql += " WHERE " + q.Filter
}
q.query = wql
return nil
}
func (q *Query) execute(acc telegraf.Accumulator) error {
// The only way to run WMI queries in parallel while being thread-safe is to
// ensure the CoInitialize[Ex]() call is bound to its current OS thread.
// Otherwise, attempting to initialize and run parallel queries across
// goroutines will result in protected memory errors.
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// init COM
if err := ole.CoInitializeEx(0, ole.COINIT_MULTITHREADED); err != nil {
var oleCode *ole.OleError
if errors.As(err, &oleCode) && oleCode.Code() != ole.S_OK && oleCode.Code() != sFalse {
return err
}
}
defer ole.CoUninitialize()
unknown, err := oleutil.CreateObject("WbemScripting.SWbemLocator")
if err != nil {
return err
}
if unknown == nil {
return errors.New("failed to create WbemScripting.SWbemLocator, maybe WMI is broken")
}
defer unknown.Release()
wmi, err := unknown.QueryInterface(ole.IID_IDispatch)
if err != nil {
return fmt.Errorf("failed to QueryInterface: %w", err)
}
defer wmi.Release()
// service is a SWbemServices
serviceRaw, err := oleutil.CallMethod(wmi, "ConnectServer", nil, q.Namespace)
if err != nil {
return fmt.Errorf("failed calling method ConnectServer: %w", err)
}
service := serviceRaw.ToIDispatch()
defer serviceRaw.Clear()
// result is a SWBemObjectSet
resultRaw, err := oleutil.CallMethod(service, "ExecQuery", q.query)
if err != nil {
return fmt.Errorf("failed calling method ExecQuery for query %s: %w", q.query, err)
}
result := resultRaw.ToIDispatch()
defer resultRaw.Clear()
countRaw, err := oleutil.GetProperty(result, "Count")
if err != nil {
return fmt.Errorf("failed getting Count: %w", err)
}
count := countRaw.Val
defer countRaw.Clear()
for i := int64(0); i < count; i++ {
itemRaw, err := oleutil.CallMethod(result, "ItemIndex", i)
if err != nil {
return fmt.Errorf("failed calling method ItemIndex: %w", err)
}
if err := q.extractProperties(acc, itemRaw); err != nil {
return err
}
}
return nil
}
func (q *Query) extractProperties(acc telegraf.Accumulator, itemRaw *ole.VARIANT) error {
tags, fields := map[string]string{}, map[string]interface{}{}
item := itemRaw.ToIDispatch()
defer item.Release()
for _, name := range q.Properties {
propertyRaw, err := oleutil.GetProperty(item, name)
if err != nil {
return fmt.Errorf("getting property %q failed: %w", name, err)
}
value := propertyRaw.Value()
propertyRaw.Clear()
if q.tagFilter != nil && q.tagFilter.Match(name) {
s, err := internal.ToString(value)
if err != nil {
return fmt.Errorf("converting property %q failed: %w", s, err)
}
tags[name] = s
continue
}
switch v := value.(type) {
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
fields[name] = v
case string:
fields[name] = v
case bool:
fields[name] = v
case []byte:
fields[name] = string(v)
case fmt.Stringer:
fields[name] = v.String()
case nil:
fields[name] = nil
default:
return fmt.Errorf("property %q of type \"%T\" unsupported", name, v)
}
}
acc.AddFields(q.ClassName, fields, tags)
return nil
}

View File

@ -1,235 +1,54 @@
//go:generate ../../../tools/readme_config_includer/generator
//go:build windows
// +build windows
package win_wmi
import (
_ "embed"
"errors"
"fmt"
"runtime"
"strings"
"sync"
"github.com/go-ole/go-ole"
"github.com/go-ole/go-ole/oleutil"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
// Query struct
type Query struct {
query string
Namespace string `toml:"namespace"`
ClassName string `toml:"class_name"`
Properties []string `toml:"properties"`
Filter string `toml:"filter"`
TagPropertiesInclude []string `toml:"tag_properties"`
tagFilter filter.Filter
}
// Wmi struct
type Wmi struct {
Queries []Query `toml:"query"`
Log telegraf.Logger
Log telegraf.Logger `toml:"-"`
}
// S_FALSE is returned by CoInitializeEx if it was already called on this thread.
const sFalse = 0x00000001
func oleInt64(item *ole.IDispatch, prop string) (int64, error) {
v, err := oleutil.GetProperty(item, prop)
if err != nil {
return 0, err
}
defer v.Clear()
return v.Val, nil
}
// Init function
func (s *Wmi) Init() error {
return compileInputs(s)
func (w *Wmi) Init() error {
for i := range w.Queries {
q := &w.Queries[i]
if err := q.prepare(); err != nil {
return fmt.Errorf("preparing query %q failed: %w", q.ClassName, err)
}
}
return nil
}
// SampleConfig function
func (s *Wmi) SampleConfig() string {
func (*Wmi) SampleConfig() string {
return sampleConfig
}
func compileInputs(s *Wmi) error {
buildWqlStatements(s)
return compileTagFilters(s)
}
func compileTagFilters(s *Wmi) error {
for i, q := range s.Queries {
var err error
s.Queries[i].tagFilter, err = compileTagFilter(q)
if err != nil {
return err
}
}
return nil
}
func compileTagFilter(q Query) (filter.Filter, error) {
tagFilter, err := filter.NewIncludeExcludeFilterDefaults(q.TagPropertiesInclude, nil, false, false)
if err != nil {
return nil, fmt.Errorf("creating tag filter failed: %w", err)
}
return tagFilter, nil
}
// build a WMI query from input configuration
func buildWqlStatements(s *Wmi) {
for i, q := range s.Queries {
wql := fmt.Sprintf("SELECT %s FROM %s", strings.Join(q.Properties, ", "), q.ClassName)
if len(q.Filter) > 0 {
wql = fmt.Sprintf("%s WHERE %s", wql, q.Filter)
}
s.Queries[i].query = wql
}
}
func (q *Query) doQuery(acc telegraf.Accumulator) error {
// The only way to run WMI queries in parallel while being thread-safe is to
// ensure the CoInitialize[Ex]() call is bound to its current OS thread.
// Otherwise, attempting to initialize and run parallel queries across
// goroutines will result in protected memory errors.
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// init COM
if err := ole.CoInitializeEx(0, ole.COINIT_MULTITHREADED); err != nil {
var oleCode *ole.OleError
if errors.As(err, &oleCode) && oleCode.Code() != ole.S_OK && oleCode.Code() != sFalse {
return err
}
}
defer ole.CoUninitialize()
unknown, err := oleutil.CreateObject("WbemScripting.SWbemLocator")
if err != nil {
return err
}
if unknown == nil {
return errors.New("failed to create WbemScripting.SWbemLocator, maybe WMI is broken")
}
defer unknown.Release()
wmi, err := unknown.QueryInterface(ole.IID_IDispatch)
if err != nil {
return fmt.Errorf("failed to QueryInterface: %w", err)
}
defer wmi.Release()
// service is a SWbemServices
serviceRaw, err := oleutil.CallMethod(wmi, "ConnectServer", nil, q.Namespace)
if err != nil {
return fmt.Errorf("failed calling method ConnectServer: %w", err)
}
service := serviceRaw.ToIDispatch()
defer serviceRaw.Clear()
// result is a SWBemObjectSet
resultRaw, err := oleutil.CallMethod(service, "ExecQuery", q.query)
if err != nil {
return fmt.Errorf("failed calling method ExecQuery for query %s: %w", q.query, err)
}
result := resultRaw.ToIDispatch()
defer resultRaw.Clear()
count, err := oleInt64(result, "Count")
if err != nil {
return fmt.Errorf("failed getting Count: %w", err)
}
for i := int64(0); i < count; i++ {
// item is a SWbemObject
itemRaw, err := oleutil.CallMethod(result, "ItemIndex", i)
if err != nil {
return fmt.Errorf("failed calling method ItemIndex: %w", err)
}
err = q.extractProperties(itemRaw, acc)
if err != nil {
return err
}
}
return nil
}
func (q *Query) extractProperties(itemRaw *ole.VARIANT, acc telegraf.Accumulator) error {
tags, fields := map[string]string{}, map[string]interface{}{}
item := itemRaw.ToIDispatch()
defer item.Release()
for _, wmiProperty := range q.Properties {
propertyValue, err := getPropertyValue(item, wmiProperty)
if err != nil {
return err
}
if q.tagFilter.Match(wmiProperty) {
valStr, err := internal.ToString(propertyValue)
if err != nil {
return fmt.Errorf("converting property %q failed: %w", wmiProperty, err)
}
tags[wmiProperty] = valStr
} else {
var fieldValue interface{}
switch v := propertyValue.(type) {
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
fieldValue = v
case string:
fieldValue = v
case bool:
fieldValue = v
case []byte:
fieldValue = string(v)
case fmt.Stringer:
fieldValue = v.String()
case nil:
fieldValue = nil
default:
return fmt.Errorf("property %q of type \"%T\" unsupported", wmiProperty, v)
}
fields[wmiProperty] = fieldValue
}
}
acc.AddFields(q.ClassName, fields, tags)
return nil
}
func getPropertyValue(item *ole.IDispatch, wmiProperty string) (interface{}, error) {
prop, err := oleutil.GetProperty(item, wmiProperty)
if err != nil {
return nil, fmt.Errorf("failed GetProperty: %w", err)
}
defer prop.Clear()
return prop.Value(), nil
}
// Gather function
func (s *Wmi) Gather(acc telegraf.Accumulator) error {
func (w *Wmi) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, query := range s.Queries {
for _, query := range w.Queries {
wg.Add(1)
go func(q Query) {
defer wg.Done()
err := q.doQuery(acc)
if err != nil {
acc.AddError(err)
}
acc.AddError(q.execute(acc))
}(query)
}
wg.Wait()

View File

@ -21,11 +21,9 @@ func (w *Wmi) Init() error {
w.Log.Warn("current platform is not supported")
return nil
}
func (w *Wmi) SampleConfig() string { return sampleConfig }
func (w *Wmi) Gather(_ telegraf.Accumulator) error { return nil }
func (*Wmi) SampleConfig() string { return sampleConfig }
func (*Wmi) Gather(_ telegraf.Accumulator) error { return nil }
func init() {
inputs.Add("win_wmi", func() telegraf.Input {
return &Wmi{}
})
inputs.Add("win_wmi", func() telegraf.Input { return &Wmi{} })
}