diff --git a/plugins/inputs/github/github.go b/plugins/inputs/github/github.go index ea5a2c177..cc595d975 100644 --- a/plugins/inputs/github/github.go +++ b/plugins/inputs/github/github.go @@ -23,53 +23,19 @@ import ( //go:embed sample.conf var sampleConfig string -// GitHub - plugin main structure type GitHub struct { Repositories []string `toml:"repositories"` AccessToken string `toml:"access_token"` AdditionalFields []string `toml:"additional_fields"` EnterpriseBaseURL string `toml:"enterprise_base_url"` HTTPTimeout config.Duration `toml:"http_timeout"` - githubClient *github.Client + githubClient *github.Client obfuscatedToken string - RateLimit selfstat.Stat - RateLimitErrors selfstat.Stat - RateRemaining selfstat.Stat -} - -// Create GitHub Client -func (g *GitHub) createGitHubClient(ctx context.Context) (*github.Client, error) { - httpClient := &http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - }, - Timeout: time.Duration(g.HTTPTimeout), - } - - g.obfuscatedToken = "Unauthenticated" - - if g.AccessToken != "" { - tokenSource := oauth2.StaticTokenSource( - &oauth2.Token{AccessToken: g.AccessToken}, - ) - oauthClient := oauth2.NewClient(ctx, tokenSource) - _ = context.WithValue(ctx, oauth2.HTTPClient, oauthClient) - - g.obfuscatedToken = g.AccessToken[0:4] + "..." + g.AccessToken[len(g.AccessToken)-3:] - - return g.newGithubClient(oauthClient) - } - - return g.newGithubClient(httpClient) -} - -func (g *GitHub) newGithubClient(httpClient *http.Client) (*github.Client, error) { - if g.EnterpriseBaseURL != "" { - return github.NewEnterpriseClient(g.EnterpriseBaseURL, "", httpClient) - } - return github.NewClient(httpClient), nil + rateLimit selfstat.Stat + rateLimitErrors selfstat.Stat + rateRemaining selfstat.Stat } func (*GitHub) SampleConfig() string { @@ -92,9 +58,9 @@ func (g *GitHub) Gather(acc telegraf.Accumulator) error { "access_token": g.obfuscatedToken, } - g.RateLimitErrors = selfstat.Register("github", "rate_limit_blocks", tokenTags) - g.RateLimit = selfstat.Register("github", "rate_limit_limit", tokenTags) - g.RateRemaining = selfstat.Register("github", "rate_limit_remaining", tokenTags) + g.rateLimitErrors = selfstat.Register("github", "rate_limit_blocks", tokenTags) + g.rateLimit = selfstat.Register("github", "rate_limit_limit", tokenTags) + g.rateRemaining = selfstat.Register("github", "rate_limit_remaining", tokenTags) } var wg sync.WaitGroup @@ -148,13 +114,45 @@ func (g *GitHub) Gather(acc telegraf.Accumulator) error { return nil } +func (g *GitHub) createGitHubClient(ctx context.Context) (*github.Client, error) { + httpClient := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }, + Timeout: time.Duration(g.HTTPTimeout), + } + + g.obfuscatedToken = "Unauthenticated" + + if g.AccessToken != "" { + tokenSource := oauth2.StaticTokenSource( + &oauth2.Token{AccessToken: g.AccessToken}, + ) + oauthClient := oauth2.NewClient(ctx, tokenSource) + _ = context.WithValue(ctx, oauth2.HTTPClient, oauthClient) + + g.obfuscatedToken = g.AccessToken[0:4] + "..." + g.AccessToken[len(g.AccessToken)-3:] + + return g.newGithubClient(oauthClient) + } + + return g.newGithubClient(httpClient) +} + +func (g *GitHub) newGithubClient(httpClient *http.Client) (*github.Client, error) { + if g.EnterpriseBaseURL != "" { + return github.NewEnterpriseClient(g.EnterpriseBaseURL, "", httpClient) + } + return github.NewClient(httpClient), nil +} + func (g *GitHub) handleRateLimit(response *github.Response, err error) { var rlErr *github.RateLimitError if err == nil { - g.RateLimit.Set(int64(response.Rate.Limit)) - g.RateRemaining.Set(int64(response.Rate.Remaining)) + g.rateLimit.Set(int64(response.Rate.Limit)) + g.rateRemaining.Set(int64(response.Rate.Remaining)) } else if errors.As(err, &rlErr) { - g.RateLimitErrors.Incr(1) + g.rateLimitErrors.Incr(1) } } diff --git a/plugins/inputs/gnmi/gnmi.go b/plugins/inputs/gnmi/gnmi.go index 9b5dac254..4dfaa8e4e 100644 --- a/plugins/inputs/gnmi/gnmi.go +++ b/plugins/inputs/gnmi/gnmi.go @@ -27,6 +27,9 @@ import ( //go:embed sample.conf var sampleConfig string +// Currently supported GNMI Extensions +var supportedExtensions = []string{"juniper_header"} + // Define the warning to show if we cannot get a metric name. const emptyNameWarning = `Got empty metric-name for response (field %q), usually indicating configuration issues as the response cannot be related to any @@ -35,14 +38,10 @@ including your device model and the following response data: %+v This message is only printed once.` -// Currently supported GNMI Extensions -var supportedExtensions = []string{"juniper_header"} - -// gNMI plugin instance type GNMI struct { Addresses []string `toml:"addresses"` - Subscriptions []Subscription `toml:"subscription"` - TagSubscriptions []TagSubscription `toml:"tag_subscription"` + Subscriptions []subscription `toml:"subscription"` + TagSubscriptions []tagSubscription `toml:"tag_subscription"` Aliases map[string]string `toml:"aliases"` Encoding string `toml:"encoding"` Origin string `toml:"origin"` @@ -74,8 +73,7 @@ type GNMI struct { wg sync.WaitGroup } -// Subscription for a gNMI client -type Subscription struct { +type subscription struct { Name string `toml:"name"` Origin string `toml:"origin"` Path string `toml:"path"` @@ -88,9 +86,8 @@ type Subscription struct { fullPath *gnmi.Path } -// Tag Subscription for a gNMI client -type TagSubscription struct { - Subscription +type tagSubscription struct { + subscription Match string `toml:"match"` Elements []string `toml:"elements"` } @@ -145,8 +142,8 @@ func (c *GNMI) Init() error { // Support and convert legacy TagOnly subscriptions if subscription.TagOnly { - tagSub := TagSubscription{ - Subscription: subscription, + tagSub := tagSubscription{ + subscription: subscription, Match: "name", } c.TagSubscriptions = append(c.TagSubscriptions, tagSub) @@ -310,7 +307,16 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { return nil } -func (s *Subscription) buildSubscription() (*gnmi.Subscription, error) { +func (c *GNMI) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (c *GNMI) Stop() { + c.cancel() + c.wg.Wait() +} + +func (s *subscription) buildSubscription() (*gnmi.Subscription, error) { gnmiPath, err := parsePath(s.Origin, s.Path, "") if err != nil { return nil, err @@ -387,31 +393,7 @@ func parsePath(origin, pathToParse, target string) (*gnmi.Path, error) { return gnmiPath, err } -// Stop listener and cleanup -func (c *GNMI) Stop() { - c.cancel() - c.wg.Wait() -} - -// Gather plugin measurements (unused) -func (c *GNMI) Gather(_ telegraf.Accumulator) error { - return nil -} - -func New() telegraf.Input { - return &GNMI{ - Encoding: "proto", - Redial: config.Duration(10 * time.Second), - } -} - -func init() { - inputs.Add("gnmi", New) - // Backwards compatible alias: - inputs.Add("cisco_telemetry_gnmi", New) -} - -func (s *Subscription) buildFullPath(c *GNMI) error { +func (s *subscription) buildFullPath(c *GNMI) error { var err error if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil { return err @@ -431,7 +413,7 @@ func (s *Subscription) buildFullPath(c *GNMI) error { return nil } -func (s *Subscription) buildAlias(aliases map[*pathInfo]string) error { +func (s *subscription) buildAlias(aliases map[*pathInfo]string) error { // Build the subscription path without keys path, err := parsePath(s.Origin, s.Path, "") if err != nil { @@ -449,3 +431,16 @@ func (s *Subscription) buildAlias(aliases map[*pathInfo]string) error { } return nil } + +func newGNMI() telegraf.Input { + return &GNMI{ + Encoding: "proto", + Redial: config.Duration(10 * time.Second), + } +} + +func init() { + inputs.Add("gnmi", newGNMI) + // Backwards compatible alias: + inputs.Add("cisco_telemetry_gnmi", newGNMI) +} diff --git a/plugins/inputs/gnmi/gnmi_test.go b/plugins/inputs/gnmi/gnmi_test.go index 3d32d9f67..4f0feae0f 100644 --- a/plugins/inputs/gnmi/gnmi_test.go +++ b/plugins/inputs/gnmi/gnmi_test.go @@ -46,25 +46,25 @@ func TestParsePath(t *testing.T) { require.Error(t, err) } -type MockServer struct { - SubscribeF func(gnmi.GNMI_SubscribeServer) error - GRPCServer *grpc.Server +type mockServer struct { + subscribeF func(gnmi.GNMI_SubscribeServer) error + grpcServer *grpc.Server } -func (s *MockServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) { +func (s *mockServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) { return nil, nil } -func (s *MockServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) { +func (s *mockServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) { return nil, nil } -func (s *MockServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) { +func (s *mockServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) { return nil, nil } -func (s *MockServer) Subscribe(server gnmi.GNMI_SubscribeServer) error { - return s.SubscribeF(server) +func (s *mockServer) Subscribe(server gnmi.GNMI_SubscribeServer) error { + return s.subscribeF(server) } func TestWaitError(t *testing.T) { @@ -72,11 +72,11 @@ func TestWaitError(t *testing.T) { require.NoError(t, err) grpcServer := grpc.NewServer() - gnmiServer := &MockServer{ - SubscribeF: func(gnmi.GNMI_SubscribeServer) error { + gnmiServer := &mockServer{ + subscribeF: func(gnmi.GNMI_SubscribeServer) error { return errors.New("testerror") }, - GRPCServer: grpcServer, + grpcServer: grpcServer, } gnmi.RegisterGNMIServer(grpcServer, gnmiServer) @@ -115,8 +115,8 @@ func TestUsernamePassword(t *testing.T) { require.NoError(t, err) grpcServer := grpc.NewServer() - gnmiServer := &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + gnmiServer := &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { metadata, ok := metadata.FromIncomingContext(server.Context()) if !ok { return errors.New("failed to get metadata") @@ -134,7 +134,7 @@ func TestUsernamePassword(t *testing.T) { return errors.New("success") }, - GRPCServer: grpcServer, + grpcServer: grpcServer, } gnmi.RegisterGNMIServer(grpcServer, gnmiServer) @@ -221,7 +221,7 @@ func TestNotification(t *testing.T) { tests := []struct { name string plugin *GNMI - server *MockServer + server *mockServer expected []telegraf.Metric }{ { @@ -230,7 +230,7 @@ func TestNotification(t *testing.T) { Log: testutil.Logger{}, Encoding: "proto", Redial: config.Duration(1 * time.Second), - Subscriptions: []Subscription{ + Subscriptions: []subscription{ { Name: "alias", Origin: "type", @@ -239,8 +239,8 @@ func TestNotification(t *testing.T) { }, }, }, - server: &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + server: &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { notification := mockGNMINotification() err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) if err != nil { @@ -319,7 +319,7 @@ func TestNotification(t *testing.T) { Log: testutil.Logger{}, Encoding: "proto", Redial: config.Duration(1 * time.Second), - Subscriptions: []Subscription{ + Subscriptions: []subscription{ { Name: "PHY_COUNTERS", Origin: "type", @@ -328,8 +328,8 @@ func TestNotification(t *testing.T) { }, }, }, - server: &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + server: &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { response := &gnmi.SubscribeResponse{ Response: &gnmi.SubscribeResponse_Update{ Update: &gnmi.Notification{ @@ -388,7 +388,7 @@ func TestNotification(t *testing.T) { Log: testutil.Logger{}, Encoding: "proto", Redial: config.Duration(1 * time.Second), - Subscriptions: []Subscription{ + Subscriptions: []subscription{ { Name: "oc-intf-desc", Origin: "openconfig-interfaces", @@ -404,8 +404,8 @@ func TestNotification(t *testing.T) { }, }, }, - server: &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + server: &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { tagResponse := &gnmi.SubscribeResponse{ Response: &gnmi.SubscribeResponse_Update{ Update: &gnmi.Notification{ @@ -507,9 +507,9 @@ func TestNotification(t *testing.T) { Log: testutil.Logger{}, Encoding: "proto", Redial: config.Duration(1 * time.Second), - TagSubscriptions: []TagSubscription{ + TagSubscriptions: []tagSubscription{ { - Subscription: Subscription{ + subscription: subscription{ Name: "oc-neigh-desc", Origin: "openconfig", Path: "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/description", @@ -518,7 +518,7 @@ func TestNotification(t *testing.T) { Elements: []string{"network-instance", "protocol", "neighbor"}, }, }, - Subscriptions: []Subscription{ + Subscriptions: []subscription{ { Name: "oc-neigh-state", Origin: "openconfig", @@ -527,8 +527,8 @@ func TestNotification(t *testing.T) { }, }, }, - server: &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + server: &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { tagResponse := &gnmi.SubscribeResponse{ Response: &gnmi.SubscribeResponse_Update{ Update: &gnmi.Notification{ @@ -665,7 +665,7 @@ func TestNotification(t *testing.T) { Log: testutil.Logger{}, Encoding: "proto", Redial: config.Duration(1 * time.Second), - Subscriptions: []Subscription{ + Subscriptions: []subscription{ { Name: "interfaces", Origin: "openconfig", @@ -675,8 +675,8 @@ func TestNotification(t *testing.T) { }, }, }, - server: &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + server: &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil { return err } @@ -782,7 +782,7 @@ func TestNotification(t *testing.T) { Log: testutil.Logger{}, Encoding: "proto", Redial: config.Duration(1 * time.Second), - Subscriptions: []Subscription{ + Subscriptions: []subscription{ { Name: "temperature", Origin: "openconfig-platform", @@ -792,8 +792,8 @@ func TestNotification(t *testing.T) { }, }, }, - server: &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + server: &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil { return err } @@ -914,7 +914,7 @@ func TestNotification(t *testing.T) { Encoding: "proto", VendorSpecific: []string{"juniper_header"}, Redial: config.Duration(1 * time.Second), - Subscriptions: []Subscription{ + Subscriptions: []subscription{ { Name: "type", Origin: "openconfig-platform", @@ -924,8 +924,8 @@ func TestNotification(t *testing.T) { }, }, }, - server: &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + server: &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil { return err } @@ -1002,7 +1002,7 @@ func TestNotification(t *testing.T) { tt.plugin.Addresses = []string{listener.Addr().String()} grpcServer := grpc.NewServer() - tt.server.GRPCServer = grpcServer + tt.server.grpcServer = grpcServer gnmi.RegisterGNMIServer(grpcServer, tt.server) var acc testutil.Accumulator @@ -1029,17 +1029,6 @@ func TestNotification(t *testing.T) { } } -type MockLogger struct { - telegraf.Logger - lastFormat string - lastArgs []interface{} -} - -func (l *MockLogger) Errorf(format string, args ...interface{}) { - l.lastFormat = format - l.lastArgs = args -} - func TestRedial(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) @@ -1053,12 +1042,12 @@ func TestRedial(t *testing.T) { } grpcServer := grpc.NewServer() - gnmiServer := &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + gnmiServer := &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { notification := mockGNMINotification() return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) }, - GRPCServer: grpcServer, + grpcServer: grpcServer, } gnmi.RegisterGNMIServer(grpcServer, gnmiServer) @@ -1084,15 +1073,15 @@ func TestRedial(t *testing.T) { require.NoError(t, err) grpcServer = grpc.NewServer() - gnmiServer = &MockServer{ - SubscribeF: func(server gnmi.GNMI_SubscribeServer) error { + gnmiServer = &mockServer{ + subscribeF: func(server gnmi.GNMI_SubscribeServer) error { notification := mockGNMINotification() notification.Prefix.Elem[0].Key["foo"] = "bar2" notification.Update[0].Path.Elem[1].Key["name"] = "str2" notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}} return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}}) }, - GRPCServer: grpcServer, + grpcServer: grpcServer, } gnmi.RegisterGNMIServer(grpcServer, gnmiServer) @@ -1116,7 +1105,7 @@ func TestCases(t *testing.T) { require.NoError(t, err) // Register the plugin - inputs.Add("gnmi", New) + inputs.Add("gnmi", newGNMI) for _, f := range folders { // Only handle folders @@ -1188,9 +1177,9 @@ func TestCases(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) grpcServer := grpc.NewServer() - gnmiServer := &MockServer{ - SubscribeF: responseFunction, - GRPCServer: grpcServer, + gnmiServer := &mockServer{ + subscribeF: responseFunction, + grpcServer: grpcServer, } gnmi.RegisterGNMIServer(grpcServer, gnmiServer) diff --git a/plugins/inputs/gnmi/handler.go b/plugins/inputs/gnmi/handler.go index 7ff683e8e..a7dad5928 100644 --- a/plugins/inputs/gnmi/handler.go +++ b/plugins/inputs/gnmi/handler.go @@ -35,7 +35,7 @@ const eidJuniperTelemetryHeader = 1 type handler struct { address string aliases map[*pathInfo]string - tagsubs []TagSubscription + tagsubs []tagSubscription maxMsgSize int emptyNameWarnShown bool vendorExt []string @@ -170,7 +170,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon h.log.Errorf("unable to parse address %s: %v", h.address, err) } if !prefix.empty() { - headerTags["path"] = prefix.FullPath() + headerTags["path"] = prefix.fullPath() } // Process and remove tag-updates from the response first so we can @@ -192,7 +192,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon for key, val := range headerTags { tags[key] = val } - for key, val := range fullPath.Tags(h.tagPathPrefix) { + for key, val := range fullPath.tags(h.tagPathPrefix) { tags[key] = val } @@ -229,7 +229,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon } // Prepare tags from prefix - fieldTags := field.path.Tags(h.tagPathPrefix) + fieldTags := field.path.tags(h.tagPathPrefix) tags := make(map[string]string, len(headerTags)+len(fieldTags)) for key, val := range headerTags { tags[key] = val @@ -278,7 +278,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon key = relative } else { // Otherwise use the last path element as the field key - key = field.path.Base() + key = field.path.base() } key = strings.ReplaceAll(key, "-", "_") } @@ -328,7 +328,7 @@ func guessPrefixFromUpdate(fields []updateField) string { return "" } if len(fields) == 1 { - return fields[0].path.Dir() + return fields[0].path.dir() } commonPath := &pathInfo{ origin: fields[0].path.origin, diff --git a/plugins/inputs/gnmi/path.go b/plugins/inputs/gnmi/path.go index 8f2f78224..70b2ba77d 100644 --- a/plugins/inputs/gnmi/path.go +++ b/plugins/inputs/gnmi/path.go @@ -290,7 +290,7 @@ func (pi *pathInfo) keepCommonPart(path *pathInfo) { pi.segments = pi.segments[:matchLen] } -func (pi *pathInfo) Dir() string { +func (pi *pathInfo) dir() string { if len(pi.segments) <= 1 { return "" } @@ -309,7 +309,7 @@ func (pi *pathInfo) Dir() string { return dir } -func (pi *pathInfo) Base() string { +func (pi *pathInfo) base() string { if len(pi.segments) == 0 { return "" } @@ -321,7 +321,7 @@ func (pi *pathInfo) Base() string { return s.id } -func (pi *pathInfo) Path() (origin, path string) { +func (pi *pathInfo) path() (origin, path string) { if len(pi.segments) == 0 { return pi.origin, "/" } @@ -333,7 +333,7 @@ func (pi *pathInfo) Path() (origin, path string) { return pi.origin, path } -func (pi *pathInfo) FullPath() string { +func (pi *pathInfo) fullPath() string { var path string if pi.origin != "" { path = pi.origin + ":" @@ -360,14 +360,14 @@ func (pi *pathInfo) String() string { return "" } - origin, path := pi.Path() + origin, path := pi.path() if origin != "" { return origin + ":" + path } return path } -func (pi *pathInfo) Tags(pathPrefix bool) map[string]string { +func (pi *pathInfo) tags(pathPrefix bool) map[string]string { tags := make(map[string]string, len(pi.keyValues)) for _, s := range pi.keyValues { var prefix string diff --git a/plugins/inputs/gnmi/tag_store.go b/plugins/inputs/gnmi/tag_store.go index c6eafc188..af6b2b55f 100644 --- a/plugins/inputs/gnmi/tag_store.go +++ b/plugins/inputs/gnmi/tag_store.go @@ -19,7 +19,7 @@ type elementsStore struct { tags map[string]map[string]string } -func newTagStore(subs []TagSubscription) *tagStore { +func newTagStore(subs []tagSubscription) *tagStore { store := tagStore{ unconditional: make(map[string]string), names: make(map[string]map[string]string), @@ -38,13 +38,13 @@ func newTagStore(subs []TagSubscription) *tagStore { } // Store tags extracted from TagSubscriptions -func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values []updateField, tags map[string]string) error { +func (s *tagStore) insert(subscription tagSubscription, path *pathInfo, values []updateField, tags map[string]string) error { switch subscription.Match { case "unconditional": for _, f := range values { tagName := subscription.Name if len(f.path.segments) > 0 { - key := f.path.Base() + key := f.path.base() key = strings.ReplaceAll(key, "-", "_") tagName += "/" + key } @@ -74,7 +74,7 @@ func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values [ for _, f := range values { tagName := subscription.Name if len(f.path.segments) > 0 { - key := f.path.Base() + key := f.path.base() key = strings.ReplaceAll(key, "-", "_") tagName += "/" + key } @@ -103,7 +103,7 @@ func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values [ for _, f := range values { tagName := subscription.Name if len(f.path.segments) > 0 { - key := f.path.Base() + key := f.path.base() key = strings.ReplaceAll(key, "-", "_") tagName += "/" + key } diff --git a/plugins/inputs/gnmi/update_fields.go b/plugins/inputs/gnmi/update_fields.go index faf82d5eb..b5202783b 100644 --- a/plugins/inputs/gnmi/update_fields.go +++ b/plugins/inputs/gnmi/update_fields.go @@ -109,7 +109,7 @@ func (h *handler) processJSONIETF(path *pathInfo, data []byte) ([]updateField, e // Try to lookup the full path to decode the field according to the // YANG model if any if h.decoder != nil { - origin, fieldPath := p.Path() + origin, fieldPath := p.path() if decoded, err := h.decoder.DecodePathElement(origin, fieldPath, entry.value); err != nil { h.log.Debugf("Decoding %s failed: %v", p, err) } else { diff --git a/plugins/inputs/google_cloud_storage/google_cloud_storage.go b/plugins/inputs/google_cloud_storage/google_cloud_storage.go index c97ad2061..920e66b8e 100644 --- a/plugins/inputs/google_cloud_storage/google_cloud_storage.go +++ b/plugins/inputs/google_cloud_storage/google_cloud_storage.go @@ -29,36 +29,32 @@ const ( var sampleConfig string type GCS struct { - CredentialsFile string `toml:"credentials_file"` - Bucket string `toml:"bucket"` - - Prefix string `toml:"key_prefix"` - OffsetKey string `toml:"offset_key"` - ObjectsPerIteration int `toml:"objects_per_iteration"` - - Log telegraf.Logger - offSet OffSet + CredentialsFile string `toml:"credentials_file"` + Bucket string `toml:"bucket"` + Prefix string `toml:"key_prefix"` + OffsetKey string `toml:"offset_key"` + ObjectsPerIteration int `toml:"objects_per_iteration"` + Log telegraf.Logger `toml:"-"` + offSet offSet parser telegraf.Parser client *storage.Client - - ctx context.Context + ctx context.Context } -type OffSet struct { +type offSet struct { OffSet string `json:"offSet"` } -func NewEmptyOffset() *OffSet { - return &OffSet{OffSet: ""} -} +func (gcs *GCS) Init() error { + gcs.ctx = context.Background() + err := gcs.setUpClient() + if err != nil { + gcs.Log.Error("Could not create client", err) + return err + } -func NewOffset(offset string) *OffSet { - return &OffSet{OffSet: offset} -} - -func (offSet *OffSet) isPresent() bool { - return offSet.OffSet != "" + return gcs.setOffset() } func (gcs *GCS) SampleConfig() string { @@ -163,7 +159,7 @@ func (gcs *GCS) updateOffset(bucket *storage.BucketHandle, name string) error { return nil } - offsetModel := NewOffset(name) + offsetModel := newOffset(name) marshalled, err := json.Marshal(offsetModel) if err != nil { @@ -184,17 +180,6 @@ func (gcs *GCS) updateOffset(bucket *storage.BucketHandle, name string) error { return nil } -func (gcs *GCS) Init() error { - gcs.ctx = context.Background() - err := gcs.setUpClient() - if err != nil { - gcs.Log.Error("Could not create client", err) - return err - } - - return gcs.setOffset() -} - func (gcs *GCS) setUpClient() error { if endpoint, present := os.LookupEnv(emulatorHostEnv); present { return gcs.setUpLocalClient(endpoint) @@ -250,7 +235,7 @@ func (gcs *GCS) setOffset() error { btk := gcs.client.Bucket(gcs.Bucket) obj := btk.Object(gcs.OffsetKey) - var offSet OffSet + var offSet offSet if r, err := obj.NewReader(gcs.ctx); err == nil { defer gcs.closeReader(r) @@ -262,7 +247,7 @@ func (gcs *GCS) setOffset() error { } } } else { - offSet = *NewEmptyOffset() + offSet = *newEmptyOffset() } gcs.offSet = offSet @@ -270,15 +255,27 @@ func (gcs *GCS) setOffset() error { return nil } +func (gcs *GCS) closeReader(r *storage.Reader) { + if err := r.Close(); err != nil { + gcs.Log.Errorf("Could not close reader: %v", err) + } +} + +func newEmptyOffset() *offSet { + return &offSet{OffSet: ""} +} + +func newOffset(offset string) *offSet { + return &offSet{OffSet: offset} +} + +func (offSet *offSet) isPresent() bool { + return offSet.OffSet != "" +} + func init() { inputs.Add("google_cloud_storage", func() telegraf.Input { gcs := &GCS{} return gcs }) } - -func (gcs *GCS) closeReader(r *storage.Reader) { - if err := r.Close(); err != nil { - gcs.Log.Errorf("Could not close reader: %v", err) - } -} diff --git a/plugins/inputs/graylog/graylog.go b/plugins/inputs/graylog/graylog.go index 6ee9da071..cc36a4bd3 100644 --- a/plugins/inputs/graylog/graylog.go +++ b/plugins/inputs/graylog/graylog.go @@ -32,7 +32,7 @@ type GrayLog struct { Timeout config.Duration `toml:"timeout"` tls.ClientConfig - client HTTPClient + client httpClient } type responseMetrics struct { @@ -54,7 +54,7 @@ type realHTTPClient struct { client *http.Client } -type HTTPClient interface { +type httpClient interface { // Returns the result of an http request // // Parameters: @@ -63,21 +63,20 @@ type HTTPClient interface { // Returns: // http.Response: HTTP response object // error : Any error that may have occurred - MakeRequest(req *http.Request) (*http.Response, error) - - SetHTTPClient(client *http.Client) - HTTPClient() *http.Client + makeRequest(req *http.Request) (*http.Response, error) + setHTTPClient(client *http.Client) + httpClient() *http.Client } -func (c *realHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { +func (c *realHTTPClient) makeRequest(req *http.Request) (*http.Response, error) { return c.client.Do(req) } -func (c *realHTTPClient) SetHTTPClient(client *http.Client) { +func (c *realHTTPClient) setHTTPClient(client *http.Client) { c.client = client } -func (c *realHTTPClient) HTTPClient() *http.Client { +func (c *realHTTPClient) httpClient() *http.Client { return c.client } @@ -85,11 +84,10 @@ func (*GrayLog) SampleConfig() string { return sampleConfig } -// Gathers data for all servers. func (h *GrayLog) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - if h.client.HTTPClient() == nil { + if h.client.httpClient() == nil { tlsCfg, err := h.ClientConfig.TLSConfig() if err != nil { return err @@ -102,7 +100,7 @@ func (h *GrayLog) Gather(acc telegraf.Accumulator) error { Transport: tr, Timeout: time.Duration(h.Timeout), } - h.client.SetHTTPClient(client) + h.client.setHTTPClient(client) } for _, server := range h.Servers { @@ -190,7 +188,7 @@ func (h *GrayLog) flatten(item, fields map[string]interface{}, id string) { } } -// Sends an HTTP request to the server using the GrayLog object's HTTPClient. +// Sends an HTTP request to the server using the GrayLog object's httpClient. // Parameters: // // serverURL: endpoint to send request to @@ -233,7 +231,7 @@ func (h *GrayLog) sendRequest(serverURL string) (string, float64, error) { req.Header.Add(k, v) } start := time.Now() - resp, err := h.client.MakeRequest(req) + resp, err := h.client.makeRequest(req) if err != nil { return "", -1, err } diff --git a/plugins/inputs/graylog/graylog_test.go b/plugins/inputs/graylog/graylog_test.go index 04337ed9c..0662dc058 100644 --- a/plugins/inputs/graylog/graylog_test.go +++ b/plugins/inputs/graylog/graylog_test.go @@ -97,7 +97,7 @@ type mockHTTPClient struct { // Mock implementation of MakeRequest. Usually returns an http.Response with // hard-coded responseBody and statusCode. However, if the request uses a // nonstandard method, it uses status code 405 (method not allowed) -func (c *mockHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { +func (c *mockHTTPClient) makeRequest(req *http.Request) (*http.Response, error) { resp := http.Response{} resp.StatusCode = c.statusCode @@ -119,10 +119,10 @@ func (c *mockHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) return &resp, nil } -func (c *mockHTTPClient) SetHTTPClient(_ *http.Client) { +func (c *mockHTTPClient) setHTTPClient(_ *http.Client) { } -func (c *mockHTTPClient) HTTPClient() *http.Client { +func (c *mockHTTPClient) httpClient() *http.Client { return nil }