chore: Fix linter findings for `revive:exported` in `plugins/inputs/g*` (#16049)
This commit is contained in:
parent
7dc397a830
commit
9152fcc0d1
|
|
@ -23,53 +23,19 @@ import (
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
// GitHub - plugin main structure
|
|
||||||
type GitHub struct {
|
type GitHub struct {
|
||||||
Repositories []string `toml:"repositories"`
|
Repositories []string `toml:"repositories"`
|
||||||
AccessToken string `toml:"access_token"`
|
AccessToken string `toml:"access_token"`
|
||||||
AdditionalFields []string `toml:"additional_fields"`
|
AdditionalFields []string `toml:"additional_fields"`
|
||||||
EnterpriseBaseURL string `toml:"enterprise_base_url"`
|
EnterpriseBaseURL string `toml:"enterprise_base_url"`
|
||||||
HTTPTimeout config.Duration `toml:"http_timeout"`
|
HTTPTimeout config.Duration `toml:"http_timeout"`
|
||||||
githubClient *github.Client
|
|
||||||
|
|
||||||
|
githubClient *github.Client
|
||||||
obfuscatedToken string
|
obfuscatedToken string
|
||||||
|
|
||||||
RateLimit selfstat.Stat
|
rateLimit selfstat.Stat
|
||||||
RateLimitErrors selfstat.Stat
|
rateLimitErrors selfstat.Stat
|
||||||
RateRemaining selfstat.Stat
|
rateRemaining selfstat.Stat
|
||||||
}
|
|
||||||
|
|
||||||
// Create GitHub Client
|
|
||||||
func (g *GitHub) createGitHubClient(ctx context.Context) (*github.Client, error) {
|
|
||||||
httpClient := &http.Client{
|
|
||||||
Transport: &http.Transport{
|
|
||||||
Proxy: http.ProxyFromEnvironment,
|
|
||||||
},
|
|
||||||
Timeout: time.Duration(g.HTTPTimeout),
|
|
||||||
}
|
|
||||||
|
|
||||||
g.obfuscatedToken = "Unauthenticated"
|
|
||||||
|
|
||||||
if g.AccessToken != "" {
|
|
||||||
tokenSource := oauth2.StaticTokenSource(
|
|
||||||
&oauth2.Token{AccessToken: g.AccessToken},
|
|
||||||
)
|
|
||||||
oauthClient := oauth2.NewClient(ctx, tokenSource)
|
|
||||||
_ = context.WithValue(ctx, oauth2.HTTPClient, oauthClient)
|
|
||||||
|
|
||||||
g.obfuscatedToken = g.AccessToken[0:4] + "..." + g.AccessToken[len(g.AccessToken)-3:]
|
|
||||||
|
|
||||||
return g.newGithubClient(oauthClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
return g.newGithubClient(httpClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *GitHub) newGithubClient(httpClient *http.Client) (*github.Client, error) {
|
|
||||||
if g.EnterpriseBaseURL != "" {
|
|
||||||
return github.NewEnterpriseClient(g.EnterpriseBaseURL, "", httpClient)
|
|
||||||
}
|
|
||||||
return github.NewClient(httpClient), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*GitHub) SampleConfig() string {
|
func (*GitHub) SampleConfig() string {
|
||||||
|
|
@ -92,9 +58,9 @@ func (g *GitHub) Gather(acc telegraf.Accumulator) error {
|
||||||
"access_token": g.obfuscatedToken,
|
"access_token": g.obfuscatedToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
g.RateLimitErrors = selfstat.Register("github", "rate_limit_blocks", tokenTags)
|
g.rateLimitErrors = selfstat.Register("github", "rate_limit_blocks", tokenTags)
|
||||||
g.RateLimit = selfstat.Register("github", "rate_limit_limit", tokenTags)
|
g.rateLimit = selfstat.Register("github", "rate_limit_limit", tokenTags)
|
||||||
g.RateRemaining = selfstat.Register("github", "rate_limit_remaining", tokenTags)
|
g.rateRemaining = selfstat.Register("github", "rate_limit_remaining", tokenTags)
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
@ -148,13 +114,45 @@ func (g *GitHub) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GitHub) createGitHubClient(ctx context.Context) (*github.Client, error) {
|
||||||
|
httpClient := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
},
|
||||||
|
Timeout: time.Duration(g.HTTPTimeout),
|
||||||
|
}
|
||||||
|
|
||||||
|
g.obfuscatedToken = "Unauthenticated"
|
||||||
|
|
||||||
|
if g.AccessToken != "" {
|
||||||
|
tokenSource := oauth2.StaticTokenSource(
|
||||||
|
&oauth2.Token{AccessToken: g.AccessToken},
|
||||||
|
)
|
||||||
|
oauthClient := oauth2.NewClient(ctx, tokenSource)
|
||||||
|
_ = context.WithValue(ctx, oauth2.HTTPClient, oauthClient)
|
||||||
|
|
||||||
|
g.obfuscatedToken = g.AccessToken[0:4] + "..." + g.AccessToken[len(g.AccessToken)-3:]
|
||||||
|
|
||||||
|
return g.newGithubClient(oauthClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
return g.newGithubClient(httpClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GitHub) newGithubClient(httpClient *http.Client) (*github.Client, error) {
|
||||||
|
if g.EnterpriseBaseURL != "" {
|
||||||
|
return github.NewEnterpriseClient(g.EnterpriseBaseURL, "", httpClient)
|
||||||
|
}
|
||||||
|
return github.NewClient(httpClient), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g *GitHub) handleRateLimit(response *github.Response, err error) {
|
func (g *GitHub) handleRateLimit(response *github.Response, err error) {
|
||||||
var rlErr *github.RateLimitError
|
var rlErr *github.RateLimitError
|
||||||
if err == nil {
|
if err == nil {
|
||||||
g.RateLimit.Set(int64(response.Rate.Limit))
|
g.rateLimit.Set(int64(response.Rate.Limit))
|
||||||
g.RateRemaining.Set(int64(response.Rate.Remaining))
|
g.rateRemaining.Set(int64(response.Rate.Remaining))
|
||||||
} else if errors.As(err, &rlErr) {
|
} else if errors.As(err, &rlErr) {
|
||||||
g.RateLimitErrors.Incr(1)
|
g.rateLimitErrors.Incr(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,9 @@ import (
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
|
// Currently supported GNMI Extensions
|
||||||
|
var supportedExtensions = []string{"juniper_header"}
|
||||||
|
|
||||||
// Define the warning to show if we cannot get a metric name.
|
// Define the warning to show if we cannot get a metric name.
|
||||||
const emptyNameWarning = `Got empty metric-name for response (field %q), usually
|
const emptyNameWarning = `Got empty metric-name for response (field %q), usually
|
||||||
indicating configuration issues as the response cannot be related to any
|
indicating configuration issues as the response cannot be related to any
|
||||||
|
|
@ -35,14 +38,10 @@ including your device model and the following response data:
|
||||||
%+v
|
%+v
|
||||||
This message is only printed once.`
|
This message is only printed once.`
|
||||||
|
|
||||||
// Currently supported GNMI Extensions
|
|
||||||
var supportedExtensions = []string{"juniper_header"}
|
|
||||||
|
|
||||||
// gNMI plugin instance
|
|
||||||
type GNMI struct {
|
type GNMI struct {
|
||||||
Addresses []string `toml:"addresses"`
|
Addresses []string `toml:"addresses"`
|
||||||
Subscriptions []Subscription `toml:"subscription"`
|
Subscriptions []subscription `toml:"subscription"`
|
||||||
TagSubscriptions []TagSubscription `toml:"tag_subscription"`
|
TagSubscriptions []tagSubscription `toml:"tag_subscription"`
|
||||||
Aliases map[string]string `toml:"aliases"`
|
Aliases map[string]string `toml:"aliases"`
|
||||||
Encoding string `toml:"encoding"`
|
Encoding string `toml:"encoding"`
|
||||||
Origin string `toml:"origin"`
|
Origin string `toml:"origin"`
|
||||||
|
|
@ -74,8 +73,7 @@ type GNMI struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscription for a gNMI client
|
type subscription struct {
|
||||||
type Subscription struct {
|
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Origin string `toml:"origin"`
|
Origin string `toml:"origin"`
|
||||||
Path string `toml:"path"`
|
Path string `toml:"path"`
|
||||||
|
|
@ -88,9 +86,8 @@ type Subscription struct {
|
||||||
fullPath *gnmi.Path
|
fullPath *gnmi.Path
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tag Subscription for a gNMI client
|
type tagSubscription struct {
|
||||||
type TagSubscription struct {
|
subscription
|
||||||
Subscription
|
|
||||||
Match string `toml:"match"`
|
Match string `toml:"match"`
|
||||||
Elements []string `toml:"elements"`
|
Elements []string `toml:"elements"`
|
||||||
}
|
}
|
||||||
|
|
@ -145,8 +142,8 @@ func (c *GNMI) Init() error {
|
||||||
|
|
||||||
// Support and convert legacy TagOnly subscriptions
|
// Support and convert legacy TagOnly subscriptions
|
||||||
if subscription.TagOnly {
|
if subscription.TagOnly {
|
||||||
tagSub := TagSubscription{
|
tagSub := tagSubscription{
|
||||||
Subscription: subscription,
|
subscription: subscription,
|
||||||
Match: "name",
|
Match: "name",
|
||||||
}
|
}
|
||||||
c.TagSubscriptions = append(c.TagSubscriptions, tagSub)
|
c.TagSubscriptions = append(c.TagSubscriptions, tagSub)
|
||||||
|
|
@ -310,7 +307,16 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscription) buildSubscription() (*gnmi.Subscription, error) {
|
func (c *GNMI) Gather(_ telegraf.Accumulator) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *GNMI) Stop() {
|
||||||
|
c.cancel()
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *subscription) buildSubscription() (*gnmi.Subscription, error) {
|
||||||
gnmiPath, err := parsePath(s.Origin, s.Path, "")
|
gnmiPath, err := parsePath(s.Origin, s.Path, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -387,31 +393,7 @@ func parsePath(origin, pathToParse, target string) (*gnmi.Path, error) {
|
||||||
return gnmiPath, err
|
return gnmiPath, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop listener and cleanup
|
func (s *subscription) buildFullPath(c *GNMI) error {
|
||||||
func (c *GNMI) Stop() {
|
|
||||||
c.cancel()
|
|
||||||
c.wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gather plugin measurements (unused)
|
|
||||||
func (c *GNMI) Gather(_ telegraf.Accumulator) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func New() telegraf.Input {
|
|
||||||
return &GNMI{
|
|
||||||
Encoding: "proto",
|
|
||||||
Redial: config.Duration(10 * time.Second),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
inputs.Add("gnmi", New)
|
|
||||||
// Backwards compatible alias:
|
|
||||||
inputs.Add("cisco_telemetry_gnmi", New)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Subscription) buildFullPath(c *GNMI) error {
|
|
||||||
var err error
|
var err error
|
||||||
if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil {
|
if s.fullPath, err = xpath.ToGNMIPath(s.Path); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -431,7 +413,7 @@ func (s *Subscription) buildFullPath(c *GNMI) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscription) buildAlias(aliases map[*pathInfo]string) error {
|
func (s *subscription) buildAlias(aliases map[*pathInfo]string) error {
|
||||||
// Build the subscription path without keys
|
// Build the subscription path without keys
|
||||||
path, err := parsePath(s.Origin, s.Path, "")
|
path, err := parsePath(s.Origin, s.Path, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -449,3 +431,16 @@ func (s *Subscription) buildAlias(aliases map[*pathInfo]string) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newGNMI() telegraf.Input {
|
||||||
|
return &GNMI{
|
||||||
|
Encoding: "proto",
|
||||||
|
Redial: config.Duration(10 * time.Second),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("gnmi", newGNMI)
|
||||||
|
// Backwards compatible alias:
|
||||||
|
inputs.Add("cisco_telemetry_gnmi", newGNMI)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -46,25 +46,25 @@ func TestParsePath(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
type MockServer struct {
|
type mockServer struct {
|
||||||
SubscribeF func(gnmi.GNMI_SubscribeServer) error
|
subscribeF func(gnmi.GNMI_SubscribeServer) error
|
||||||
GRPCServer *grpc.Server
|
grpcServer *grpc.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MockServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) {
|
func (s *mockServer) Capabilities(context.Context, *gnmi.CapabilityRequest) (*gnmi.CapabilityResponse, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MockServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) {
|
func (s *mockServer) Get(context.Context, *gnmi.GetRequest) (*gnmi.GetResponse, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MockServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) {
|
func (s *mockServer) Set(context.Context, *gnmi.SetRequest) (*gnmi.SetResponse, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MockServer) Subscribe(server gnmi.GNMI_SubscribeServer) error {
|
func (s *mockServer) Subscribe(server gnmi.GNMI_SubscribeServer) error {
|
||||||
return s.SubscribeF(server)
|
return s.subscribeF(server)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWaitError(t *testing.T) {
|
func TestWaitError(t *testing.T) {
|
||||||
|
|
@ -72,11 +72,11 @@ func TestWaitError(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
gnmiServer := &MockServer{
|
gnmiServer := &mockServer{
|
||||||
SubscribeF: func(gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(gnmi.GNMI_SubscribeServer) error {
|
||||||
return errors.New("testerror")
|
return errors.New("testerror")
|
||||||
},
|
},
|
||||||
GRPCServer: grpcServer,
|
grpcServer: grpcServer,
|
||||||
}
|
}
|
||||||
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
||||||
|
|
||||||
|
|
@ -115,8 +115,8 @@ func TestUsernamePassword(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
gnmiServer := &MockServer{
|
gnmiServer := &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
metadata, ok := metadata.FromIncomingContext(server.Context())
|
metadata, ok := metadata.FromIncomingContext(server.Context())
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("failed to get metadata")
|
return errors.New("failed to get metadata")
|
||||||
|
|
@ -134,7 +134,7 @@ func TestUsernamePassword(t *testing.T) {
|
||||||
|
|
||||||
return errors.New("success")
|
return errors.New("success")
|
||||||
},
|
},
|
||||||
GRPCServer: grpcServer,
|
grpcServer: grpcServer,
|
||||||
}
|
}
|
||||||
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
||||||
|
|
||||||
|
|
@ -221,7 +221,7 @@ func TestNotification(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
plugin *GNMI
|
plugin *GNMI
|
||||||
server *MockServer
|
server *mockServer
|
||||||
expected []telegraf.Metric
|
expected []telegraf.Metric
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
|
@ -230,7 +230,7 @@ func TestNotification(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
Encoding: "proto",
|
Encoding: "proto",
|
||||||
Redial: config.Duration(1 * time.Second),
|
Redial: config.Duration(1 * time.Second),
|
||||||
Subscriptions: []Subscription{
|
Subscriptions: []subscription{
|
||||||
{
|
{
|
||||||
Name: "alias",
|
Name: "alias",
|
||||||
Origin: "type",
|
Origin: "type",
|
||||||
|
|
@ -239,8 +239,8 @@ func TestNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
server: &MockServer{
|
server: &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
notification := mockGNMINotification()
|
notification := mockGNMINotification()
|
||||||
err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
|
err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -319,7 +319,7 @@ func TestNotification(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
Encoding: "proto",
|
Encoding: "proto",
|
||||||
Redial: config.Duration(1 * time.Second),
|
Redial: config.Duration(1 * time.Second),
|
||||||
Subscriptions: []Subscription{
|
Subscriptions: []subscription{
|
||||||
{
|
{
|
||||||
Name: "PHY_COUNTERS",
|
Name: "PHY_COUNTERS",
|
||||||
Origin: "type",
|
Origin: "type",
|
||||||
|
|
@ -328,8 +328,8 @@ func TestNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
server: &MockServer{
|
server: &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
response := &gnmi.SubscribeResponse{
|
response := &gnmi.SubscribeResponse{
|
||||||
Response: &gnmi.SubscribeResponse_Update{
|
Response: &gnmi.SubscribeResponse_Update{
|
||||||
Update: &gnmi.Notification{
|
Update: &gnmi.Notification{
|
||||||
|
|
@ -388,7 +388,7 @@ func TestNotification(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
Encoding: "proto",
|
Encoding: "proto",
|
||||||
Redial: config.Duration(1 * time.Second),
|
Redial: config.Duration(1 * time.Second),
|
||||||
Subscriptions: []Subscription{
|
Subscriptions: []subscription{
|
||||||
{
|
{
|
||||||
Name: "oc-intf-desc",
|
Name: "oc-intf-desc",
|
||||||
Origin: "openconfig-interfaces",
|
Origin: "openconfig-interfaces",
|
||||||
|
|
@ -404,8 +404,8 @@ func TestNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
server: &MockServer{
|
server: &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
tagResponse := &gnmi.SubscribeResponse{
|
tagResponse := &gnmi.SubscribeResponse{
|
||||||
Response: &gnmi.SubscribeResponse_Update{
|
Response: &gnmi.SubscribeResponse_Update{
|
||||||
Update: &gnmi.Notification{
|
Update: &gnmi.Notification{
|
||||||
|
|
@ -507,9 +507,9 @@ func TestNotification(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
Encoding: "proto",
|
Encoding: "proto",
|
||||||
Redial: config.Duration(1 * time.Second),
|
Redial: config.Duration(1 * time.Second),
|
||||||
TagSubscriptions: []TagSubscription{
|
TagSubscriptions: []tagSubscription{
|
||||||
{
|
{
|
||||||
Subscription: Subscription{
|
subscription: subscription{
|
||||||
Name: "oc-neigh-desc",
|
Name: "oc-neigh-desc",
|
||||||
Origin: "openconfig",
|
Origin: "openconfig",
|
||||||
Path: "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/description",
|
Path: "/network-instances/network-instance/protocols/protocol/bgp/neighbors/neighbor/state/description",
|
||||||
|
|
@ -518,7 +518,7 @@ func TestNotification(t *testing.T) {
|
||||||
Elements: []string{"network-instance", "protocol", "neighbor"},
|
Elements: []string{"network-instance", "protocol", "neighbor"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Subscriptions: []Subscription{
|
Subscriptions: []subscription{
|
||||||
{
|
{
|
||||||
Name: "oc-neigh-state",
|
Name: "oc-neigh-state",
|
||||||
Origin: "openconfig",
|
Origin: "openconfig",
|
||||||
|
|
@ -527,8 +527,8 @@ func TestNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
server: &MockServer{
|
server: &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
tagResponse := &gnmi.SubscribeResponse{
|
tagResponse := &gnmi.SubscribeResponse{
|
||||||
Response: &gnmi.SubscribeResponse_Update{
|
Response: &gnmi.SubscribeResponse_Update{
|
||||||
Update: &gnmi.Notification{
|
Update: &gnmi.Notification{
|
||||||
|
|
@ -665,7 +665,7 @@ func TestNotification(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
Encoding: "proto",
|
Encoding: "proto",
|
||||||
Redial: config.Duration(1 * time.Second),
|
Redial: config.Duration(1 * time.Second),
|
||||||
Subscriptions: []Subscription{
|
Subscriptions: []subscription{
|
||||||
{
|
{
|
||||||
Name: "interfaces",
|
Name: "interfaces",
|
||||||
Origin: "openconfig",
|
Origin: "openconfig",
|
||||||
|
|
@ -675,8 +675,8 @@ func TestNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
server: &MockServer{
|
server: &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
|
if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -782,7 +782,7 @@ func TestNotification(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
Encoding: "proto",
|
Encoding: "proto",
|
||||||
Redial: config.Duration(1 * time.Second),
|
Redial: config.Duration(1 * time.Second),
|
||||||
Subscriptions: []Subscription{
|
Subscriptions: []subscription{
|
||||||
{
|
{
|
||||||
Name: "temperature",
|
Name: "temperature",
|
||||||
Origin: "openconfig-platform",
|
Origin: "openconfig-platform",
|
||||||
|
|
@ -792,8 +792,8 @@ func TestNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
server: &MockServer{
|
server: &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
|
if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -914,7 +914,7 @@ func TestNotification(t *testing.T) {
|
||||||
Encoding: "proto",
|
Encoding: "proto",
|
||||||
VendorSpecific: []string{"juniper_header"},
|
VendorSpecific: []string{"juniper_header"},
|
||||||
Redial: config.Duration(1 * time.Second),
|
Redial: config.Duration(1 * time.Second),
|
||||||
Subscriptions: []Subscription{
|
Subscriptions: []subscription{
|
||||||
{
|
{
|
||||||
Name: "type",
|
Name: "type",
|
||||||
Origin: "openconfig-platform",
|
Origin: "openconfig-platform",
|
||||||
|
|
@ -924,8 +924,8 @@ func TestNotification(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
server: &MockServer{
|
server: &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
|
if err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -1002,7 +1002,7 @@ func TestNotification(t *testing.T) {
|
||||||
tt.plugin.Addresses = []string{listener.Addr().String()}
|
tt.plugin.Addresses = []string{listener.Addr().String()}
|
||||||
|
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
tt.server.GRPCServer = grpcServer
|
tt.server.grpcServer = grpcServer
|
||||||
gnmi.RegisterGNMIServer(grpcServer, tt.server)
|
gnmi.RegisterGNMIServer(grpcServer, tt.server)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
@ -1029,17 +1029,6 @@ func TestNotification(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type MockLogger struct {
|
|
||||||
telegraf.Logger
|
|
||||||
lastFormat string
|
|
||||||
lastArgs []interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *MockLogger) Errorf(format string, args ...interface{}) {
|
|
||||||
l.lastFormat = format
|
|
||||||
l.lastArgs = args
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRedial(t *testing.T) {
|
func TestRedial(t *testing.T) {
|
||||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -1053,12 +1042,12 @@ func TestRedial(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
gnmiServer := &MockServer{
|
gnmiServer := &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
notification := mockGNMINotification()
|
notification := mockGNMINotification()
|
||||||
return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
|
return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
|
||||||
},
|
},
|
||||||
GRPCServer: grpcServer,
|
grpcServer: grpcServer,
|
||||||
}
|
}
|
||||||
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
||||||
|
|
||||||
|
|
@ -1084,15 +1073,15 @@ func TestRedial(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
grpcServer = grpc.NewServer()
|
grpcServer = grpc.NewServer()
|
||||||
gnmiServer = &MockServer{
|
gnmiServer = &mockServer{
|
||||||
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
subscribeF: func(server gnmi.GNMI_SubscribeServer) error {
|
||||||
notification := mockGNMINotification()
|
notification := mockGNMINotification()
|
||||||
notification.Prefix.Elem[0].Key["foo"] = "bar2"
|
notification.Prefix.Elem[0].Key["foo"] = "bar2"
|
||||||
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
|
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
|
||||||
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}}
|
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}}
|
||||||
return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
|
return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
|
||||||
},
|
},
|
||||||
GRPCServer: grpcServer,
|
grpcServer: grpcServer,
|
||||||
}
|
}
|
||||||
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
||||||
|
|
||||||
|
|
@ -1116,7 +1105,7 @@ func TestCases(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Register the plugin
|
// Register the plugin
|
||||||
inputs.Add("gnmi", New)
|
inputs.Add("gnmi", newGNMI)
|
||||||
|
|
||||||
for _, f := range folders {
|
for _, f := range folders {
|
||||||
// Only handle folders
|
// Only handle folders
|
||||||
|
|
@ -1188,9 +1177,9 @@ func TestCases(t *testing.T) {
|
||||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
gnmiServer := &MockServer{
|
gnmiServer := &mockServer{
|
||||||
SubscribeF: responseFunction,
|
subscribeF: responseFunction,
|
||||||
GRPCServer: grpcServer,
|
grpcServer: grpcServer,
|
||||||
}
|
}
|
||||||
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
gnmi.RegisterGNMIServer(grpcServer, gnmiServer)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ const eidJuniperTelemetryHeader = 1
|
||||||
type handler struct {
|
type handler struct {
|
||||||
address string
|
address string
|
||||||
aliases map[*pathInfo]string
|
aliases map[*pathInfo]string
|
||||||
tagsubs []TagSubscription
|
tagsubs []tagSubscription
|
||||||
maxMsgSize int
|
maxMsgSize int
|
||||||
emptyNameWarnShown bool
|
emptyNameWarnShown bool
|
||||||
vendorExt []string
|
vendorExt []string
|
||||||
|
|
@ -170,7 +170,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
|
||||||
h.log.Errorf("unable to parse address %s: %v", h.address, err)
|
h.log.Errorf("unable to parse address %s: %v", h.address, err)
|
||||||
}
|
}
|
||||||
if !prefix.empty() {
|
if !prefix.empty() {
|
||||||
headerTags["path"] = prefix.FullPath()
|
headerTags["path"] = prefix.fullPath()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process and remove tag-updates from the response first so we can
|
// Process and remove tag-updates from the response first so we can
|
||||||
|
|
@ -192,7 +192,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
|
||||||
for key, val := range headerTags {
|
for key, val := range headerTags {
|
||||||
tags[key] = val
|
tags[key] = val
|
||||||
}
|
}
|
||||||
for key, val := range fullPath.Tags(h.tagPathPrefix) {
|
for key, val := range fullPath.tags(h.tagPathPrefix) {
|
||||||
tags[key] = val
|
tags[key] = val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -229,7 +229,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare tags from prefix
|
// Prepare tags from prefix
|
||||||
fieldTags := field.path.Tags(h.tagPathPrefix)
|
fieldTags := field.path.tags(h.tagPathPrefix)
|
||||||
tags := make(map[string]string, len(headerTags)+len(fieldTags))
|
tags := make(map[string]string, len(headerTags)+len(fieldTags))
|
||||||
for key, val := range headerTags {
|
for key, val := range headerTags {
|
||||||
tags[key] = val
|
tags[key] = val
|
||||||
|
|
@ -278,7 +278,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
|
||||||
key = relative
|
key = relative
|
||||||
} else {
|
} else {
|
||||||
// Otherwise use the last path element as the field key
|
// Otherwise use the last path element as the field key
|
||||||
key = field.path.Base()
|
key = field.path.base()
|
||||||
}
|
}
|
||||||
key = strings.ReplaceAll(key, "-", "_")
|
key = strings.ReplaceAll(key, "-", "_")
|
||||||
}
|
}
|
||||||
|
|
@ -328,7 +328,7 @@ func guessPrefixFromUpdate(fields []updateField) string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
if len(fields) == 1 {
|
if len(fields) == 1 {
|
||||||
return fields[0].path.Dir()
|
return fields[0].path.dir()
|
||||||
}
|
}
|
||||||
commonPath := &pathInfo{
|
commonPath := &pathInfo{
|
||||||
origin: fields[0].path.origin,
|
origin: fields[0].path.origin,
|
||||||
|
|
|
||||||
|
|
@ -290,7 +290,7 @@ func (pi *pathInfo) keepCommonPart(path *pathInfo) {
|
||||||
pi.segments = pi.segments[:matchLen]
|
pi.segments = pi.segments[:matchLen]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pi *pathInfo) Dir() string {
|
func (pi *pathInfo) dir() string {
|
||||||
if len(pi.segments) <= 1 {
|
if len(pi.segments) <= 1 {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
@ -309,7 +309,7 @@ func (pi *pathInfo) Dir() string {
|
||||||
return dir
|
return dir
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pi *pathInfo) Base() string {
|
func (pi *pathInfo) base() string {
|
||||||
if len(pi.segments) == 0 {
|
if len(pi.segments) == 0 {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
@ -321,7 +321,7 @@ func (pi *pathInfo) Base() string {
|
||||||
return s.id
|
return s.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pi *pathInfo) Path() (origin, path string) {
|
func (pi *pathInfo) path() (origin, path string) {
|
||||||
if len(pi.segments) == 0 {
|
if len(pi.segments) == 0 {
|
||||||
return pi.origin, "/"
|
return pi.origin, "/"
|
||||||
}
|
}
|
||||||
|
|
@ -333,7 +333,7 @@ func (pi *pathInfo) Path() (origin, path string) {
|
||||||
return pi.origin, path
|
return pi.origin, path
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pi *pathInfo) FullPath() string {
|
func (pi *pathInfo) fullPath() string {
|
||||||
var path string
|
var path string
|
||||||
if pi.origin != "" {
|
if pi.origin != "" {
|
||||||
path = pi.origin + ":"
|
path = pi.origin + ":"
|
||||||
|
|
@ -360,14 +360,14 @@ func (pi *pathInfo) String() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
origin, path := pi.Path()
|
origin, path := pi.path()
|
||||||
if origin != "" {
|
if origin != "" {
|
||||||
return origin + ":" + path
|
return origin + ":" + path
|
||||||
}
|
}
|
||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pi *pathInfo) Tags(pathPrefix bool) map[string]string {
|
func (pi *pathInfo) tags(pathPrefix bool) map[string]string {
|
||||||
tags := make(map[string]string, len(pi.keyValues))
|
tags := make(map[string]string, len(pi.keyValues))
|
||||||
for _, s := range pi.keyValues {
|
for _, s := range pi.keyValues {
|
||||||
var prefix string
|
var prefix string
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ type elementsStore struct {
|
||||||
tags map[string]map[string]string
|
tags map[string]map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTagStore(subs []TagSubscription) *tagStore {
|
func newTagStore(subs []tagSubscription) *tagStore {
|
||||||
store := tagStore{
|
store := tagStore{
|
||||||
unconditional: make(map[string]string),
|
unconditional: make(map[string]string),
|
||||||
names: make(map[string]map[string]string),
|
names: make(map[string]map[string]string),
|
||||||
|
|
@ -38,13 +38,13 @@ func newTagStore(subs []TagSubscription) *tagStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store tags extracted from TagSubscriptions
|
// Store tags extracted from TagSubscriptions
|
||||||
func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values []updateField, tags map[string]string) error {
|
func (s *tagStore) insert(subscription tagSubscription, path *pathInfo, values []updateField, tags map[string]string) error {
|
||||||
switch subscription.Match {
|
switch subscription.Match {
|
||||||
case "unconditional":
|
case "unconditional":
|
||||||
for _, f := range values {
|
for _, f := range values {
|
||||||
tagName := subscription.Name
|
tagName := subscription.Name
|
||||||
if len(f.path.segments) > 0 {
|
if len(f.path.segments) > 0 {
|
||||||
key := f.path.Base()
|
key := f.path.base()
|
||||||
key = strings.ReplaceAll(key, "-", "_")
|
key = strings.ReplaceAll(key, "-", "_")
|
||||||
tagName += "/" + key
|
tagName += "/" + key
|
||||||
}
|
}
|
||||||
|
|
@ -74,7 +74,7 @@ func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values [
|
||||||
for _, f := range values {
|
for _, f := range values {
|
||||||
tagName := subscription.Name
|
tagName := subscription.Name
|
||||||
if len(f.path.segments) > 0 {
|
if len(f.path.segments) > 0 {
|
||||||
key := f.path.Base()
|
key := f.path.base()
|
||||||
key = strings.ReplaceAll(key, "-", "_")
|
key = strings.ReplaceAll(key, "-", "_")
|
||||||
tagName += "/" + key
|
tagName += "/" + key
|
||||||
}
|
}
|
||||||
|
|
@ -103,7 +103,7 @@ func (s *tagStore) insert(subscription TagSubscription, path *pathInfo, values [
|
||||||
for _, f := range values {
|
for _, f := range values {
|
||||||
tagName := subscription.Name
|
tagName := subscription.Name
|
||||||
if len(f.path.segments) > 0 {
|
if len(f.path.segments) > 0 {
|
||||||
key := f.path.Base()
|
key := f.path.base()
|
||||||
key = strings.ReplaceAll(key, "-", "_")
|
key = strings.ReplaceAll(key, "-", "_")
|
||||||
tagName += "/" + key
|
tagName += "/" + key
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,7 @@ func (h *handler) processJSONIETF(path *pathInfo, data []byte) ([]updateField, e
|
||||||
// Try to lookup the full path to decode the field according to the
|
// Try to lookup the full path to decode the field according to the
|
||||||
// YANG model if any
|
// YANG model if any
|
||||||
if h.decoder != nil {
|
if h.decoder != nil {
|
||||||
origin, fieldPath := p.Path()
|
origin, fieldPath := p.path()
|
||||||
if decoded, err := h.decoder.DecodePathElement(origin, fieldPath, entry.value); err != nil {
|
if decoded, err := h.decoder.DecodePathElement(origin, fieldPath, entry.value); err != nil {
|
||||||
h.log.Debugf("Decoding %s failed: %v", p, err)
|
h.log.Debugf("Decoding %s failed: %v", p, err)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -29,36 +29,32 @@ const (
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
type GCS struct {
|
type GCS struct {
|
||||||
CredentialsFile string `toml:"credentials_file"`
|
CredentialsFile string `toml:"credentials_file"`
|
||||||
Bucket string `toml:"bucket"`
|
Bucket string `toml:"bucket"`
|
||||||
|
Prefix string `toml:"key_prefix"`
|
||||||
Prefix string `toml:"key_prefix"`
|
OffsetKey string `toml:"offset_key"`
|
||||||
OffsetKey string `toml:"offset_key"`
|
ObjectsPerIteration int `toml:"objects_per_iteration"`
|
||||||
ObjectsPerIteration int `toml:"objects_per_iteration"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
Log telegraf.Logger
|
|
||||||
offSet OffSet
|
|
||||||
|
|
||||||
|
offSet offSet
|
||||||
parser telegraf.Parser
|
parser telegraf.Parser
|
||||||
client *storage.Client
|
client *storage.Client
|
||||||
|
ctx context.Context
|
||||||
ctx context.Context
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type OffSet struct {
|
type offSet struct {
|
||||||
OffSet string `json:"offSet"`
|
OffSet string `json:"offSet"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEmptyOffset() *OffSet {
|
func (gcs *GCS) Init() error {
|
||||||
return &OffSet{OffSet: ""}
|
gcs.ctx = context.Background()
|
||||||
}
|
err := gcs.setUpClient()
|
||||||
|
if err != nil {
|
||||||
|
gcs.Log.Error("Could not create client", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func NewOffset(offset string) *OffSet {
|
return gcs.setOffset()
|
||||||
return &OffSet{OffSet: offset}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (offSet *OffSet) isPresent() bool {
|
|
||||||
return offSet.OffSet != ""
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gcs *GCS) SampleConfig() string {
|
func (gcs *GCS) SampleConfig() string {
|
||||||
|
|
@ -163,7 +159,7 @@ func (gcs *GCS) updateOffset(bucket *storage.BucketHandle, name string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
offsetModel := NewOffset(name)
|
offsetModel := newOffset(name)
|
||||||
marshalled, err := json.Marshal(offsetModel)
|
marshalled, err := json.Marshal(offsetModel)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -184,17 +180,6 @@ func (gcs *GCS) updateOffset(bucket *storage.BucketHandle, name string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gcs *GCS) Init() error {
|
|
||||||
gcs.ctx = context.Background()
|
|
||||||
err := gcs.setUpClient()
|
|
||||||
if err != nil {
|
|
||||||
gcs.Log.Error("Could not create client", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return gcs.setOffset()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gcs *GCS) setUpClient() error {
|
func (gcs *GCS) setUpClient() error {
|
||||||
if endpoint, present := os.LookupEnv(emulatorHostEnv); present {
|
if endpoint, present := os.LookupEnv(emulatorHostEnv); present {
|
||||||
return gcs.setUpLocalClient(endpoint)
|
return gcs.setUpLocalClient(endpoint)
|
||||||
|
|
@ -250,7 +235,7 @@ func (gcs *GCS) setOffset() error {
|
||||||
btk := gcs.client.Bucket(gcs.Bucket)
|
btk := gcs.client.Bucket(gcs.Bucket)
|
||||||
obj := btk.Object(gcs.OffsetKey)
|
obj := btk.Object(gcs.OffsetKey)
|
||||||
|
|
||||||
var offSet OffSet
|
var offSet offSet
|
||||||
|
|
||||||
if r, err := obj.NewReader(gcs.ctx); err == nil {
|
if r, err := obj.NewReader(gcs.ctx); err == nil {
|
||||||
defer gcs.closeReader(r)
|
defer gcs.closeReader(r)
|
||||||
|
|
@ -262,7 +247,7 @@ func (gcs *GCS) setOffset() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
offSet = *NewEmptyOffset()
|
offSet = *newEmptyOffset()
|
||||||
}
|
}
|
||||||
|
|
||||||
gcs.offSet = offSet
|
gcs.offSet = offSet
|
||||||
|
|
@ -270,15 +255,27 @@ func (gcs *GCS) setOffset() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gcs *GCS) closeReader(r *storage.Reader) {
|
||||||
|
if err := r.Close(); err != nil {
|
||||||
|
gcs.Log.Errorf("Could not close reader: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEmptyOffset() *offSet {
|
||||||
|
return &offSet{OffSet: ""}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newOffset(offset string) *offSet {
|
||||||
|
return &offSet{OffSet: offset}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (offSet *offSet) isPresent() bool {
|
||||||
|
return offSet.OffSet != ""
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("google_cloud_storage", func() telegraf.Input {
|
inputs.Add("google_cloud_storage", func() telegraf.Input {
|
||||||
gcs := &GCS{}
|
gcs := &GCS{}
|
||||||
return gcs
|
return gcs
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gcs *GCS) closeReader(r *storage.Reader) {
|
|
||||||
if err := r.Close(); err != nil {
|
|
||||||
gcs.Log.Errorf("Could not close reader: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ type GrayLog struct {
|
||||||
Timeout config.Duration `toml:"timeout"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
|
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
client HTTPClient
|
client httpClient
|
||||||
}
|
}
|
||||||
|
|
||||||
type responseMetrics struct {
|
type responseMetrics struct {
|
||||||
|
|
@ -54,7 +54,7 @@ type realHTTPClient struct {
|
||||||
client *http.Client
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
type HTTPClient interface {
|
type httpClient interface {
|
||||||
// Returns the result of an http request
|
// Returns the result of an http request
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
|
|
@ -63,21 +63,20 @@ type HTTPClient interface {
|
||||||
// Returns:
|
// Returns:
|
||||||
// http.Response: HTTP response object
|
// http.Response: HTTP response object
|
||||||
// error : Any error that may have occurred
|
// error : Any error that may have occurred
|
||||||
MakeRequest(req *http.Request) (*http.Response, error)
|
makeRequest(req *http.Request) (*http.Response, error)
|
||||||
|
setHTTPClient(client *http.Client)
|
||||||
SetHTTPClient(client *http.Client)
|
httpClient() *http.Client
|
||||||
HTTPClient() *http.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *realHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
|
func (c *realHTTPClient) makeRequest(req *http.Request) (*http.Response, error) {
|
||||||
return c.client.Do(req)
|
return c.client.Do(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *realHTTPClient) SetHTTPClient(client *http.Client) {
|
func (c *realHTTPClient) setHTTPClient(client *http.Client) {
|
||||||
c.client = client
|
c.client = client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *realHTTPClient) HTTPClient() *http.Client {
|
func (c *realHTTPClient) httpClient() *http.Client {
|
||||||
return c.client
|
return c.client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -85,11 +84,10 @@ func (*GrayLog) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gathers data for all servers.
|
|
||||||
func (h *GrayLog) Gather(acc telegraf.Accumulator) error {
|
func (h *GrayLog) Gather(acc telegraf.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
if h.client.HTTPClient() == nil {
|
if h.client.httpClient() == nil {
|
||||||
tlsCfg, err := h.ClientConfig.TLSConfig()
|
tlsCfg, err := h.ClientConfig.TLSConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -102,7 +100,7 @@ func (h *GrayLog) Gather(acc telegraf.Accumulator) error {
|
||||||
Transport: tr,
|
Transport: tr,
|
||||||
Timeout: time.Duration(h.Timeout),
|
Timeout: time.Duration(h.Timeout),
|
||||||
}
|
}
|
||||||
h.client.SetHTTPClient(client)
|
h.client.setHTTPClient(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, server := range h.Servers {
|
for _, server := range h.Servers {
|
||||||
|
|
@ -190,7 +188,7 @@ func (h *GrayLog) flatten(item, fields map[string]interface{}, id string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends an HTTP request to the server using the GrayLog object's HTTPClient.
|
// Sends an HTTP request to the server using the GrayLog object's httpClient.
|
||||||
// Parameters:
|
// Parameters:
|
||||||
//
|
//
|
||||||
// serverURL: endpoint to send request to
|
// serverURL: endpoint to send request to
|
||||||
|
|
@ -233,7 +231,7 @@ func (h *GrayLog) sendRequest(serverURL string) (string, float64, error) {
|
||||||
req.Header.Add(k, v)
|
req.Header.Add(k, v)
|
||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
resp, err := h.client.MakeRequest(req)
|
resp, err := h.client.makeRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", -1, err
|
return "", -1, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -97,7 +97,7 @@ type mockHTTPClient struct {
|
||||||
// Mock implementation of MakeRequest. Usually returns an http.Response with
|
// Mock implementation of MakeRequest. Usually returns an http.Response with
|
||||||
// hard-coded responseBody and statusCode. However, if the request uses a
|
// hard-coded responseBody and statusCode. However, if the request uses a
|
||||||
// nonstandard method, it uses status code 405 (method not allowed)
|
// nonstandard method, it uses status code 405 (method not allowed)
|
||||||
func (c *mockHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
|
func (c *mockHTTPClient) makeRequest(req *http.Request) (*http.Response, error) {
|
||||||
resp := http.Response{}
|
resp := http.Response{}
|
||||||
resp.StatusCode = c.statusCode
|
resp.StatusCode = c.statusCode
|
||||||
|
|
||||||
|
|
@ -119,10 +119,10 @@ func (c *mockHTTPClient) MakeRequest(req *http.Request) (*http.Response, error)
|
||||||
return &resp, nil
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockHTTPClient) SetHTTPClient(_ *http.Client) {
|
func (c *mockHTTPClient) setHTTPClient(_ *http.Client) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *mockHTTPClient) HTTPClient() *http.Client {
|
func (c *mockHTTPClient) httpClient() *http.Client {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue