chore(inputs.gnmi): Test plugin and subscription options (#13384)

This commit is contained in:
Sven Rebhan 2023-06-05 21:56:09 +02:00 committed by GitHub
parent b24c832a41
commit 187902ecd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 49 deletions

View File

@ -3,7 +3,6 @@ package gnmi
import ( import (
"context" "context"
"crypto/tls"
_ "embed" _ "embed"
"fmt" "fmt"
"path" "path"
@ -58,13 +57,12 @@ type GNMI struct {
MaxMsgSize config.Size `toml:"max_msg_size"` MaxMsgSize config.Size `toml:"max_msg_size"`
Trace bool `toml:"dump_responses"` Trace bool `toml:"dump_responses"`
CanonicalFieldNames bool `toml:"canonical_field_names"` CanonicalFieldNames bool `toml:"canonical_field_names"`
EnableTLS bool `toml:"enable_tls"` EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;use 'tls_enable' instead"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
internaltls.ClientConfig internaltls.ClientConfig
// Internal state // Internal state
internalAliases map[string]string internalAliases map[string]string
acc telegraf.Accumulator
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -100,17 +98,35 @@ func (*GNMI) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Start the http listener service func (c *GNMI) Init() error {
func (c *GNMI) Start(acc telegraf.Accumulator) error { // Check options
var err error if time.Duration(c.Redial) <= 0 {
var ctx context.Context return fmt.Errorf("redial duration must be positive")
var tlscfg *tls.Config }
var request *gnmiLib.SubscribeRequest
c.acc = acc
ctx, c.cancel = context.WithCancel(context.Background())
// Check vendor_specific options configured by user
if err := choice.CheckSlice(c.VendorSpecific, supportedExtensions); err != nil {
return fmt.Errorf("unsupported vendor_specific option: %w", err)
}
// Use the new TLS option for enabling
// Honor deprecated option
enable := (c.ClientConfig.Enable != nil && *c.ClientConfig.Enable) || c.EnableTLS
c.ClientConfig.Enable = &enable
// Split the subscriptions into "normal" and "tag" subscription
// and prepare them.
for i := len(c.Subscriptions) - 1; i >= 0; i-- { for i := len(c.Subscriptions) - 1; i >= 0; i-- {
subscription := c.Subscriptions[i] subscription := c.Subscriptions[i]
// Check the subscription
if subscription.Name == "" {
return fmt.Errorf("empty 'name' found for subscription %d", i+1)
}
if subscription.Path == "" {
return fmt.Errorf("empty 'path' found for subscription %d", i+1)
}
// Support and convert legacy TagOnly subscriptions // Support and convert legacy TagOnly subscriptions
if subscription.TagOnly { if subscription.TagOnly {
tagSub := TagSubscription{ tagSub := TagSubscription{
@ -122,12 +138,12 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...) c.Subscriptions = append(c.Subscriptions[:i], c.Subscriptions[i+1:]...)
continue continue
} }
if err = subscription.buildFullPath(c); err != nil { if err := subscription.buildFullPath(c); err != nil {
return err return err
} }
} }
for idx := range c.TagSubscriptions { for idx := range c.TagSubscriptions {
if err = c.TagSubscriptions[idx].buildFullPath(c); err != nil { if err := c.TagSubscriptions[idx].buildFullPath(c); err != nil {
return err return err
} }
if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly { if c.TagSubscriptions[idx].TagOnly != c.TagSubscriptions[0].TagOnly {
@ -151,27 +167,8 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
} }
} }
// Validate configuration
if request, err = c.newSubscribeRequest(); err != nil {
return err
} else if time.Duration(c.Redial).Nanoseconds() <= 0 {
return fmt.Errorf("redial duration must be positive")
}
// Parse TLS config
if c.EnableTLS {
if tlscfg, err = c.ClientConfig.TLSConfig(); err != nil {
return err
}
}
if len(c.Username) > 0 {
ctx = metadata.AppendToOutgoingContext(ctx, "username", c.Username, "password", c.Password)
}
// Invert explicit alias list and prefill subscription names // Invert explicit alias list and prefill subscription names
c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions)) c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases)+len(c.TagSubscriptions))
for _, s := range c.Subscriptions { for _, s := range c.Subscriptions {
if err := s.buildAlias(c.internalAliases); err != nil { if err := s.buildAlias(c.internalAliases); err != nil {
return err return err
@ -182,12 +179,34 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
return err return err
} }
} }
for alias, encodingPath := range c.Aliases { for alias, encodingPath := range c.Aliases {
c.internalAliases[encodingPath] = alias c.internalAliases[encodingPath] = alias
} }
c.Log.Debugf("Internal alias mapping: %+v", c.internalAliases) c.Log.Debugf("Internal alias mapping: %+v", c.internalAliases)
return nil
}
func (c *GNMI) Start(acc telegraf.Accumulator) error {
// Validate configuration
request, err := c.newSubscribeRequest()
if err != nil {
return err
}
// Generate TLS config if enabled
tlscfg, err := c.ClientConfig.TLSConfig()
if err != nil {
return err
}
// Prepare the context, optionally with credentials
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
if len(c.Username) > 0 {
ctx = metadata.AppendToOutgoingContext(ctx, "username", c.Username, "password", c.Password)
}
// Create a goroutine for each device, dial and subscribe // Create a goroutine for each device, dial and subscribe
c.wg.Add(len(c.Addresses)) c.wg.Add(len(c.Addresses))
for _, addr := range c.Addresses { for _, addr := range c.Addresses {
@ -321,14 +340,6 @@ func init() {
inputs.Add("cisco_telemetry_gnmi", New) inputs.Add("cisco_telemetry_gnmi", New)
} }
func (c *GNMI) Init() error {
// Check vendor_specific options configured by user
if err := choice.CheckSlice(c.VendorSpecific, supportedExtensions); err != nil {
return fmt.Errorf("unsupported vendor_specific option: %w", err)
}
return nil
}
func (s *Subscription) buildFullPath(c *GNMI) error { func (s *Subscription) buildFullPath(c *GNMI) error {
var err error var err error
if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil { if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil {

View File

@ -89,8 +89,8 @@ func TestWaitError(t *testing.T) {
} }
var acc testutil.Accumulator var acc testutil.Accumulator
err = plugin.Start(&acc) require.NoError(t, plugin.Init())
require.NoError(t, err) require.NoError(t, plugin.Start(&acc))
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
@ -148,8 +148,8 @@ func TestUsernamePassword(t *testing.T) {
} }
var acc testutil.Accumulator var acc testutil.Accumulator
err = plugin.Start(&acc) require.NoError(t, plugin.Init())
require.NoError(t, err) require.NoError(t, plugin.Start(&acc))
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
@ -1005,8 +1005,8 @@ func TestNotification(t *testing.T) {
gnmiLib.RegisterGNMIServer(grpcServer, tt.server) gnmiLib.RegisterGNMIServer(grpcServer, tt.server)
var acc testutil.Accumulator var acc testutil.Accumulator
err = tt.plugin.Start(&acc) require.NoError(t, tt.plugin.Init())
require.NoError(t, err) require.NoError(t, tt.plugin.Start(&acc))
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
@ -1069,8 +1069,8 @@ func TestRedial(t *testing.T) {
}() }()
var acc testutil.Accumulator var acc testutil.Accumulator
err = plugin.Start(&acc) require.NoError(t, plugin.Init())
require.NoError(t, err) require.NoError(t, plugin.Start(&acc))
acc.Wait(2) acc.Wait(2)
grpcServer.Stop() grpcServer.Stop()
@ -1205,6 +1205,7 @@ func TestCases(t *testing.T) {
}() }()
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(&acc)) require.NoError(t, plugin.Start(&acc))
require.Eventually(t, require.Eventually(t,