diff --git a/plugins/inputs/jenkins/client.go b/plugins/inputs/jenkins/client.go index 43555f094..d91d62f74 100644 --- a/plugins/inputs/jenkins/client.go +++ b/plugins/inputs/jenkins/client.go @@ -75,42 +75,42 @@ func (c *client) doGet(ctx context.Context, url string, v interface{}) error { // Clear invalid token if unauthorized if resp.StatusCode == http.StatusUnauthorized { c.sessionCookie = nil - return APIError{ - URL: url, - StatusCode: resp.StatusCode, - Title: resp.Status, + return apiError{ + url: url, + statusCode: resp.StatusCode, + title: resp.Status, } } if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return APIError{ - URL: url, - StatusCode: resp.StatusCode, - Title: resp.Status, + return apiError{ + url: url, + statusCode: resp.StatusCode, + title: resp.Status, } } if resp.StatusCode == http.StatusNoContent { - return APIError{ - URL: url, - StatusCode: resp.StatusCode, - Title: resp.Status, + return apiError{ + url: url, + statusCode: resp.StatusCode, + title: resp.Status, } } return json.NewDecoder(resp.Body).Decode(v) } -type APIError struct { - URL string - StatusCode int - Title string - Description string +type apiError struct { + url string + statusCode int + title string + description string } -func (e APIError) Error() string { - if e.Description != "" { - return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description) +func (e apiError) Error() string { + if e.description != "" { + return fmt.Sprintf("[%s] %s: %s", e.url, e.title, e.description) } - return fmt.Sprintf("[%s] %s", e.URL, e.Title) + return fmt.Sprintf("[%s] %s", e.url, e.title) } func createGetRequest(url, username, password string, sessionCookie *http.Cookie) (*http.Request, error) { @@ -132,7 +132,7 @@ func (c *client) getJobs(ctx context.Context, jr *jobRequest) (js *jobResponse, js = new(jobResponse) url := jobPath if jr != nil { - url = jr.URL() + url = jr.url() } err = c.doGet(ctx, url, js) return js, err diff --git a/plugins/inputs/jenkins/jenkins.go b/plugins/inputs/jenkins/jenkins.go index 62bd2ec9c..274766d94 100644 --- a/plugins/inputs/jenkins/jenkins.go +++ b/plugins/inputs/jenkins/jenkins.go @@ -24,20 +24,20 @@ import ( //go:embed sample.conf var sampleConfig string -// Jenkins plugin gathers information about the nodes and jobs running in a jenkins instance. +const ( + measurementJenkins = "jenkins" + measurementNode = "jenkins_node" + measurementJob = "jenkins_job" +) + type Jenkins struct { - URL string - Username string - Password string - Source string - Port string + URL string `toml:"url"` + Username string `toml:"username"` + Password string `toml:"password"` // HTTP Timeout specified as a string - 3s, 1m, 1h - ResponseTimeout config.Duration - - tls.ClientConfig - client *client - - Log telegraf.Logger + ResponseTimeout config.Duration `toml:"response_timeout"` + source string + port string MaxConnections int `toml:"max_connections"` MaxBuildAge config.Duration `toml:"max_build_age"` @@ -52,21 +52,18 @@ type Jenkins struct { NodeInclude []string `toml:"node_include"` nodeFilter filter.Filter + tls.ClientConfig + client *client + + Log telegraf.Logger `toml:"-"` + semaphore chan struct{} } -// measurement -const ( - measurementJenkins = "jenkins" - measurementNode = "jenkins_node" - measurementJob = "jenkins_job" -) - func (*Jenkins) SampleConfig() string { return sampleConfig } -// Gather implements telegraf.Input interface func (j *Jenkins) Gather(acc telegraf.Accumulator) error { if j.client == nil { client, err := j.newHTTPClient() @@ -109,14 +106,14 @@ func (j *Jenkins) initialize(client *http.Client) error { } if u.Port() == "" { if u.Scheme == "http" { - j.Port = "80" + j.port = "80" } else if u.Scheme == "https" { - j.Port = "443" + j.port = "443" } } else { - j.Port = u.Port() + j.port = u.Port() } - j.Source = u.Hostname() + j.source = u.Hostname() // init filters j.jobFilter, err = filter.NewIncludeExcludeFilter(j.JobInclude, j.JobExclude) @@ -168,8 +165,8 @@ func (j *Jenkins) gatherNodeData(n node, acc telegraf.Accumulator) error { tags["status"] = "offline" } - tags["source"] = j.Source - tags["port"] = j.Port + tags["source"] = j.source + tags["port"] = j.port fields := make(map[string]interface{}) fields["num_executors"] = n.NumExecutors @@ -218,7 +215,7 @@ func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) { } // get total and busy executors - tags := map[string]string{"source": j.Source, "port": j.Port} + tags := map[string]string{"source": j.source, "port": j.port} fields := make(map[string]interface{}) fields["busy_executors"] = nodeResp.BusyExecutors fields["total_executors"] = nodeResp.TotalExecutors @@ -314,7 +311,7 @@ func (j *Jenkins) getJobDetail(jr jobRequest, acc telegraf.Accumulator) error { cutoff := time.Now().Add(-1 * time.Duration(j.MaxBuildAge)) // Here we just test - if build.GetTimestamp().Before(cutoff) { + if build.getTimestamp().Before(cutoff) { return nil } @@ -389,7 +386,7 @@ type buildResponse struct { Timestamp int64 `json:"timestamp"` } -func (b *buildResponse) GetTimestamp() time.Time { +func (b *buildResponse) getTimestamp() time.Time { return time.Unix(0, b.Timestamp*int64(time.Millisecond)) } @@ -418,7 +415,7 @@ func (jr jobRequest) combinedEscaped() []string { return jobs } -func (jr jobRequest) URL() string { +func (jr jobRequest) url() string { return "/job/" + strings.Join(jr.combinedEscaped(), "/job/") + jobPath } @@ -435,13 +432,13 @@ func (jr jobRequest) parentsString() string { } func (j *Jenkins) gatherJobBuild(jr jobRequest, b *buildResponse, acc telegraf.Accumulator) { - tags := map[string]string{"name": jr.name, "parents": jr.parentsString(), "result": b.Result, "source": j.Source, "port": j.Port} + tags := map[string]string{"name": jr.name, "parents": jr.parentsString(), "result": b.Result, "source": j.source, "port": j.port} fields := make(map[string]interface{}) fields["duration"] = b.Duration fields["result_code"] = mapResultCode(b.Result) fields["number"] = b.Number - acc.AddFields(measurementJob, fields, tags, b.GetTimestamp()) + acc.AddFields(measurementJob, fields, tags, b.getTimestamp()) } // perform status mapping diff --git a/plugins/inputs/jenkins/jenkins_test.go b/plugins/inputs/jenkins/jenkins_test.go index 30027b47c..6ee429ef7 100644 --- a/plugins/inputs/jenkins/jenkins_test.go +++ b/plugins/inputs/jenkins/jenkins_test.go @@ -46,7 +46,7 @@ func TestJobRequest(t *testing.T) { } for _, test := range tests { hierarchyName := test.input.hierarchyName() - address := test.input.URL() + address := test.input.url() if hierarchyName != test.hierarchyName { t.Errorf("Expected %s, got %s\n", test.hierarchyName, hierarchyName) } diff --git a/plugins/inputs/jolokia2_agent/jolokia2_agent.go b/plugins/inputs/jolokia2_agent/jolokia2_agent.go index 00728f7cb..39e9adf8c 100644 --- a/plugins/inputs/jolokia2_agent/jolokia2_agent.go +++ b/plugins/inputs/jolokia2_agent/jolokia2_agent.go @@ -19,9 +19,9 @@ import ( var sampleConfig string type JolokiaAgent struct { - DefaultFieldPrefix string - DefaultFieldSeparator string - DefaultTagPrefix string + DefaultFieldPrefix string `toml:"default_field_prefix"` + DefaultFieldSeparator string `toml:"default_field_separator"` + DefaultTagPrefix string `toml:"default_tag_prefix"` URLs []string `toml:"urls"` Username string `toml:"username"` diff --git a/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go b/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go index ee6fadc67..256027f68 100644 --- a/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go +++ b/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go @@ -86,7 +86,7 @@ func TestScalarValues(t *testing.T) { server := setupServer(response) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -165,7 +165,7 @@ func TestObjectValues(t *testing.T) { server := setupServer(string(response)) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -253,7 +253,7 @@ func TestStatusCodes(t *testing.T) { server := setupServer(response) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -303,7 +303,7 @@ func TestTagRenaming(t *testing.T) { server := setupServer(response) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -396,7 +396,7 @@ func TestFieldRenaming(t *testing.T) { server := setupServer(response) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -504,7 +504,7 @@ func TestMetricMbeanMatching(t *testing.T) { server := setupServer(response) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -597,7 +597,7 @@ func TestMetricCompaction(t *testing.T) { server := setupServer(response) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -643,7 +643,7 @@ func TestJolokia2_ClientAuthRequest(t *testing.T) { })) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(` + plugin := setupPlugin(t, fmt.Sprintf(` [jolokia2_agent] urls = ["%s/jolokia"] username = "sally" @@ -904,7 +904,7 @@ func setupServer(resp string) *httptest.Server { })) } -func SetupPlugin(t *testing.T, conf string) telegraf.Input { +func setupPlugin(t *testing.T, conf string) telegraf.Input { table, err := toml.Parse([]byte(conf)) if err != nil { t.Fatalf("Unable to parse config! %v", err) diff --git a/plugins/inputs/jolokia2_proxy/jolokia2_proxy.go b/plugins/inputs/jolokia2_proxy/jolokia2_proxy.go index b9018789d..60c0d11d5 100644 --- a/plugins/inputs/jolokia2_proxy/jolokia2_proxy.go +++ b/plugins/inputs/jolokia2_proxy/jolokia2_proxy.go @@ -23,7 +23,7 @@ type JolokiaProxy struct { URL string `toml:"url"` DefaultTargetPassword string `toml:"default_target_password"` DefaultTargetUsername string `toml:"default_target_username"` - Targets []JolokiaProxyTargetConfig `toml:"target"` + Targets []jolokiaProxyTargetConfig `toml:"target"` Username string `toml:"username"` Password string `toml:"password"` @@ -36,7 +36,7 @@ type JolokiaProxy struct { gatherer *common.Gatherer } -type JolokiaProxyTargetConfig struct { +type jolokiaProxyTargetConfig struct { URL string `toml:"url"` Username string `toml:"username"` Password string `toml:"password"` diff --git a/plugins/inputs/jolokia2_proxy/jolokia2_proxy_test.go b/plugins/inputs/jolokia2_proxy/jolokia2_proxy_test.go index 7adb318d0..037114719 100644 --- a/plugins/inputs/jolokia2_proxy/jolokia2_proxy_test.go +++ b/plugins/inputs/jolokia2_proxy/jolokia2_proxy_test.go @@ -57,7 +57,7 @@ func TestJolokia2_ProxyTargets(t *testing.T) { server := setupServer(response) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) + plugin := setupPlugin(t, fmt.Sprintf(config, server.URL)) var acc testutil.Accumulator require.NoError(t, plugin.Gather(&acc)) @@ -106,7 +106,7 @@ func TestJolokia2_ClientProxyAuthRequest(t *testing.T) { })) defer server.Close() - plugin := SetupPlugin(t, fmt.Sprintf(` + plugin := setupPlugin(t, fmt.Sprintf(` [jolokia2_proxy] url = "%s/jolokia" username = "sally" @@ -169,7 +169,7 @@ func setupServer(resp string) *httptest.Server { })) } -func SetupPlugin(t *testing.T, conf string) telegraf.Input { +func setupPlugin(t *testing.T, conf string) telegraf.Input { table, err := toml.Parse([]byte(conf)) if err != nil { t.Fatalf("Unable to parse config! %v", err) diff --git a/plugins/inputs/jti_openconfig_telemetry/collection.go b/plugins/inputs/jti_openconfig_telemetry/collection.go index 206b04d7f..eabc44d24 100644 --- a/plugins/inputs/jti_openconfig_telemetry/collection.go +++ b/plugins/inputs/jti_openconfig_telemetry/collection.go @@ -2,21 +2,21 @@ package jti_openconfig_telemetry import "sort" -type DataGroup struct { +type dataGroup struct { numKeys int tags map[string]string data map[string]interface{} } // Sort the data groups by number of keys -type CollectionByKeys []DataGroup +type collectionByKeys []dataGroup -func (a CollectionByKeys) Len() int { return len(a) } -func (a CollectionByKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a CollectionByKeys) Less(i, j int) bool { return a[i].numKeys < a[j].numKeys } +func (a collectionByKeys) Len() int { return len(a) } +func (a collectionByKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a collectionByKeys) Less(i, j int) bool { return a[i].numKeys < a[j].numKeys } // Checks to see if there is already a group with these tags and returns its index. Returns -1 if unavailable. -func (a CollectionByKeys) IsAvailable(tags map[string]string) *DataGroup { +func (a collectionByKeys) isAvailable(tags map[string]string) *dataGroup { sort.Sort(a) // Iterate through all the groups and see if we have group with these tags @@ -45,14 +45,14 @@ func (a CollectionByKeys) IsAvailable(tags map[string]string) *DataGroup { } // Inserts into already existing group or creates a new group -func (a CollectionByKeys) Insert(tags map[string]string, data map[string]interface{}) CollectionByKeys { +func (a collectionByKeys) insert(tags map[string]string, data map[string]interface{}) collectionByKeys { // If there is already a group with this set of tags, insert into it. Otherwise create a new group and insert - if group := a.IsAvailable(tags); group != nil { + if group := a.isAvailable(tags); group != nil { for k, v := range data { group.data[k] = v } } else { - a = append(a, DataGroup{len(tags), tags, data}) + a = append(a, dataGroup{len(tags), tags, data}) } return a diff --git a/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go b/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go index b572d8d34..84cb9aba9 100644 --- a/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go +++ b/plugins/inputs/jti_openconfig_telemetry/jti_openconfig_telemetry.go @@ -31,6 +31,11 @@ import ( //go:embed sample.conf var sampleConfig string +var ( + // Regex to match and extract data points from path value in received key + keyPathRegex = regexp.MustCompile(`/([^/]*)\[([A-Za-z0-9\-/]*=[^\[]*)]`) +) + type OpenConfigTelemetry struct { Servers []string `toml:"servers"` Sensors []string `toml:"sensors"` @@ -45,28 +50,29 @@ type OpenConfigTelemetry struct { KeepAlivePeriod config.Duration `toml:"keep_alive_period"` common_tls.ClientConfig - Log telegraf.Logger + Log telegraf.Logger `toml:"-"` sensorsConfig []sensorConfig grpcClientConns []grpcConnection wg *sync.WaitGroup } +// Structure to hold sensors path list and measurement name +type sensorConfig struct { + measurementName string + pathList []*telemetry.Path +} + type grpcConnection struct { connection *grpc.ClientConn cancel context.CancelFunc } -func (g *grpcConnection) Close() { +func (g *grpcConnection) close() { g.connection.Close() g.cancel() } -var ( - // Regex to match and extract data points from path value in received key - keyPathRegex = regexp.MustCompile(`/([^/]*)\[([A-Za-z0-9\-/]*=[^\[]*)]`) -) - func (*OpenConfigTelemetry) SampleConfig() string { return sampleConfig } @@ -82,13 +88,97 @@ func (m *OpenConfigTelemetry) Init() error { return nil } +func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error { + // Build sensors config + if m.splitSensorConfig() == 0 { + return errors.New("no valid sensor configuration available") + } + + // Parse TLS config + var creds credentials.TransportCredentials + if m.EnableTLS { + tlscfg, err := m.ClientConfig.TLSConfig() + if err != nil { + return err + } + creds = credentials.NewTLS(tlscfg) + } else { + creds = insecure.NewCredentials() + } + + // Setup the basic connection options + options := []grpc.DialOption{ + grpc.WithTransportCredentials(creds), + } + + // Add keep-alive settings + if m.KeepAlivePeriod > 0 { + params := keepalive.ClientParameters{ + Time: time.Duration(m.KeepAlivePeriod), + Timeout: 2 * time.Duration(m.KeepAlivePeriod), + } + options = append(options, grpc.WithKeepaliveParams(params)) + } + + // Connect to given list of servers and start collecting data + var grpcClientConn *grpc.ClientConn + var wg sync.WaitGroup + m.wg = &wg + + for _, server := range m.Servers { + ctx, cancel := context.WithCancel(context.Background()) + if len(m.Username) > 0 { + ctx = metadata.AppendToOutgoingContext( + ctx, + "username", m.Username, + "password", m.Password, + "clientid", m.ClientID, + ) + } + + // Extract device address and port + grpcServer, grpcPort, err := net.SplitHostPort(server) + if err != nil { + m.Log.Errorf("Invalid server address: %s", err.Error()) + cancel() + continue + } + + grpcClientConn, err = grpc.NewClient(server, options...) + if err != nil { + m.Log.Errorf("Failed to connect to %s: %s", server, err.Error()) + } else { + m.Log.Debugf("Opened a new gRPC session to %s on port %s", grpcServer, grpcPort) + } + + // Add to the list of client connections + connection := grpcConnection{ + connection: grpcClientConn, + cancel: cancel, + } + m.grpcClientConns = append(m.grpcClientConns, connection) + + if m.Username != "" && m.Password != "" && m.ClientID != "" { + if err := m.authenticate(ctx, server, grpcClientConn); err != nil { + m.Log.Errorf("Error authenticating to %s: %v", grpcServer, err) + continue + } + } + + // Subscribe and gather telemetry data + m.collectData(ctx, grpcServer, grpcClientConn, acc) + } + + return nil +} + func (m *OpenConfigTelemetry) Gather(_ telegraf.Accumulator) error { return nil } func (m *OpenConfigTelemetry) Stop() { for _, grpcClientConn := range m.grpcClientConns { - grpcClientConn.Close() + grpcClientConn.close() } m.wg.Wait() } @@ -123,11 +213,11 @@ func spitTagsNPath(xmlpath string) (string, map[string]string) { // Takes in a OC response, extracts tag information from keys and returns a // list of groups with unique sets of tags+values -func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServer string) []DataGroup { +func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServer string) []dataGroup { // Use empty prefix. We will update this when we iterate over key-value pairs prefix := "" - dgroups := []DataGroup{} + dgroups := []dataGroup{} for _, v := range r.Kv { kv := make(map[string]interface{}) @@ -168,28 +258,22 @@ func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServe finaltags["path"] = r.Path // Insert derived key and value - dgroups = CollectionByKeys(dgroups).Insert(finaltags, kv) + dgroups = collectionByKeys(dgroups).insert(finaltags, kv) // Insert data from message header - dgroups = CollectionByKeys(dgroups).Insert(finaltags, + dgroups = collectionByKeys(dgroups).insert(finaltags, map[string]interface{}{"_sequence": r.SequenceNumber}) - dgroups = CollectionByKeys(dgroups).Insert(finaltags, + dgroups = collectionByKeys(dgroups).insert(finaltags, map[string]interface{}{"_timestamp": r.Timestamp}) - dgroups = CollectionByKeys(dgroups).Insert(finaltags, + dgroups = collectionByKeys(dgroups).insert(finaltags, map[string]interface{}{"_component_id": r.ComponentId}) - dgroups = CollectionByKeys(dgroups).Insert(finaltags, + dgroups = collectionByKeys(dgroups).insert(finaltags, map[string]interface{}{"_subcomponent_id": r.SubComponentId}) } return dgroups } -// Structure to hold sensors path list and measurement name -type sensorConfig struct { - measurementName string - pathList []*telemetry.Path -} - // Takes in sensor configuration and converts it into slice of sensorConfig objects func (m *OpenConfigTelemetry) splitSensorConfig() int { var pathlist []*telemetry.Path @@ -366,90 +450,6 @@ func (m *OpenConfigTelemetry) authenticate(ctx context.Context, server string, g return nil } -func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error { - // Build sensors config - if m.splitSensorConfig() == 0 { - return errors.New("no valid sensor configuration available") - } - - // Parse TLS config - var creds credentials.TransportCredentials - if m.EnableTLS { - tlscfg, err := m.ClientConfig.TLSConfig() - if err != nil { - return err - } - creds = credentials.NewTLS(tlscfg) - } else { - creds = insecure.NewCredentials() - } - - // Setup the basic connection options - options := []grpc.DialOption{ - grpc.WithTransportCredentials(creds), - } - - // Add keep-alive settings - if m.KeepAlivePeriod > 0 { - params := keepalive.ClientParameters{ - Time: time.Duration(m.KeepAlivePeriod), - Timeout: 2 * time.Duration(m.KeepAlivePeriod), - } - options = append(options, grpc.WithKeepaliveParams(params)) - } - - // Connect to given list of servers and start collecting data - var grpcClientConn *grpc.ClientConn - var wg sync.WaitGroup - m.wg = &wg - - for _, server := range m.Servers { - ctx, cancel := context.WithCancel(context.Background()) - if len(m.Username) > 0 { - ctx = metadata.AppendToOutgoingContext( - ctx, - "username", m.Username, - "password", m.Password, - "clientid", m.ClientID, - ) - } - - // Extract device address and port - grpcServer, grpcPort, err := net.SplitHostPort(server) - if err != nil { - m.Log.Errorf("Invalid server address: %s", err.Error()) - cancel() - continue - } - - grpcClientConn, err = grpc.NewClient(server, options...) - if err != nil { - m.Log.Errorf("Failed to connect to %s: %s", server, err.Error()) - } else { - m.Log.Debugf("Opened a new gRPC session to %s on port %s", grpcServer, grpcPort) - } - - // Add to the list of client connections - connection := grpcConnection{ - connection: grpcClientConn, - cancel: cancel, - } - m.grpcClientConns = append(m.grpcClientConns, connection) - - if m.Username != "" && m.Password != "" && m.ClientID != "" { - if err := m.authenticate(ctx, server, grpcClientConn); err != nil { - m.Log.Errorf("Error authenticating to %s: %v", grpcServer, err) - continue - } - } - - // Subscribe and gather telemetry data - m.collectData(ctx, grpcServer, grpcClientConn, acc) - } - - return nil -} - func init() { inputs.Add("jti_openconfig_telemetry", func() telegraf.Input { return &OpenConfigTelemetry{