chore: Fix linter findings for `revive:exported` in `plugins/inputs/j*` (#16078)

This commit is contained in:
Paweł Żak 2024-10-28 14:30:24 +01:00 committed by GitHub
parent 0a6e51d1d6
commit 43c503e734
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 183 additions and 186 deletions

View File

@ -75,42 +75,42 @@ func (c *client) doGet(ctx context.Context, url string, v interface{}) error {
// Clear invalid token if unauthorized // Clear invalid token if unauthorized
if resp.StatusCode == http.StatusUnauthorized { if resp.StatusCode == http.StatusUnauthorized {
c.sessionCookie = nil c.sessionCookie = nil
return APIError{ return apiError{
URL: url, url: url,
StatusCode: resp.StatusCode, statusCode: resp.StatusCode,
Title: resp.Status, title: resp.Status,
} }
} }
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return APIError{ return apiError{
URL: url, url: url,
StatusCode: resp.StatusCode, statusCode: resp.StatusCode,
Title: resp.Status, title: resp.Status,
} }
} }
if resp.StatusCode == http.StatusNoContent { if resp.StatusCode == http.StatusNoContent {
return APIError{ return apiError{
URL: url, url: url,
StatusCode: resp.StatusCode, statusCode: resp.StatusCode,
Title: resp.Status, title: resp.Status,
} }
} }
return json.NewDecoder(resp.Body).Decode(v) return json.NewDecoder(resp.Body).Decode(v)
} }
type APIError struct { type apiError struct {
URL string url string
StatusCode int statusCode int
Title string title string
Description string description string
} }
func (e APIError) Error() string { func (e apiError) Error() string {
if e.Description != "" { if e.description != "" {
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description) return fmt.Sprintf("[%s] %s: %s", e.url, e.title, e.description)
} }
return fmt.Sprintf("[%s] %s", e.URL, e.Title) return fmt.Sprintf("[%s] %s", e.url, e.title)
} }
func createGetRequest(url, username, password string, sessionCookie *http.Cookie) (*http.Request, error) { func createGetRequest(url, username, password string, sessionCookie *http.Cookie) (*http.Request, error) {
@ -132,7 +132,7 @@ func (c *client) getJobs(ctx context.Context, jr *jobRequest) (js *jobResponse,
js = new(jobResponse) js = new(jobResponse)
url := jobPath url := jobPath
if jr != nil { if jr != nil {
url = jr.URL() url = jr.url()
} }
err = c.doGet(ctx, url, js) err = c.doGet(ctx, url, js)
return js, err return js, err

View File

@ -24,20 +24,20 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Jenkins plugin gathers information about the nodes and jobs running in a jenkins instance. const (
measurementJenkins = "jenkins"
measurementNode = "jenkins_node"
measurementJob = "jenkins_job"
)
type Jenkins struct { type Jenkins struct {
URL string URL string `toml:"url"`
Username string Username string `toml:"username"`
Password string Password string `toml:"password"`
Source string
Port string
// HTTP Timeout specified as a string - 3s, 1m, 1h // HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout config.Duration ResponseTimeout config.Duration `toml:"response_timeout"`
source string
tls.ClientConfig port string
client *client
Log telegraf.Logger
MaxConnections int `toml:"max_connections"` MaxConnections int `toml:"max_connections"`
MaxBuildAge config.Duration `toml:"max_build_age"` MaxBuildAge config.Duration `toml:"max_build_age"`
@ -52,21 +52,18 @@ type Jenkins struct {
NodeInclude []string `toml:"node_include"` NodeInclude []string `toml:"node_include"`
nodeFilter filter.Filter nodeFilter filter.Filter
tls.ClientConfig
client *client
Log telegraf.Logger `toml:"-"`
semaphore chan struct{} semaphore chan struct{}
} }
// measurement
const (
measurementJenkins = "jenkins"
measurementNode = "jenkins_node"
measurementJob = "jenkins_job"
)
func (*Jenkins) SampleConfig() string { func (*Jenkins) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather implements telegraf.Input interface
func (j *Jenkins) Gather(acc telegraf.Accumulator) error { func (j *Jenkins) Gather(acc telegraf.Accumulator) error {
if j.client == nil { if j.client == nil {
client, err := j.newHTTPClient() client, err := j.newHTTPClient()
@ -109,14 +106,14 @@ func (j *Jenkins) initialize(client *http.Client) error {
} }
if u.Port() == "" { if u.Port() == "" {
if u.Scheme == "http" { if u.Scheme == "http" {
j.Port = "80" j.port = "80"
} else if u.Scheme == "https" { } else if u.Scheme == "https" {
j.Port = "443" j.port = "443"
} }
} else { } else {
j.Port = u.Port() j.port = u.Port()
} }
j.Source = u.Hostname() j.source = u.Hostname()
// init filters // init filters
j.jobFilter, err = filter.NewIncludeExcludeFilter(j.JobInclude, j.JobExclude) j.jobFilter, err = filter.NewIncludeExcludeFilter(j.JobInclude, j.JobExclude)
@ -168,8 +165,8 @@ func (j *Jenkins) gatherNodeData(n node, acc telegraf.Accumulator) error {
tags["status"] = "offline" tags["status"] = "offline"
} }
tags["source"] = j.Source tags["source"] = j.source
tags["port"] = j.Port tags["port"] = j.port
fields := make(map[string]interface{}) fields := make(map[string]interface{})
fields["num_executors"] = n.NumExecutors fields["num_executors"] = n.NumExecutors
@ -218,7 +215,7 @@ func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) {
} }
// get total and busy executors // get total and busy executors
tags := map[string]string{"source": j.Source, "port": j.Port} tags := map[string]string{"source": j.source, "port": j.port}
fields := make(map[string]interface{}) fields := make(map[string]interface{})
fields["busy_executors"] = nodeResp.BusyExecutors fields["busy_executors"] = nodeResp.BusyExecutors
fields["total_executors"] = nodeResp.TotalExecutors fields["total_executors"] = nodeResp.TotalExecutors
@ -314,7 +311,7 @@ func (j *Jenkins) getJobDetail(jr jobRequest, acc telegraf.Accumulator) error {
cutoff := time.Now().Add(-1 * time.Duration(j.MaxBuildAge)) cutoff := time.Now().Add(-1 * time.Duration(j.MaxBuildAge))
// Here we just test // Here we just test
if build.GetTimestamp().Before(cutoff) { if build.getTimestamp().Before(cutoff) {
return nil return nil
} }
@ -389,7 +386,7 @@ type buildResponse struct {
Timestamp int64 `json:"timestamp"` Timestamp int64 `json:"timestamp"`
} }
func (b *buildResponse) GetTimestamp() time.Time { func (b *buildResponse) getTimestamp() time.Time {
return time.Unix(0, b.Timestamp*int64(time.Millisecond)) return time.Unix(0, b.Timestamp*int64(time.Millisecond))
} }
@ -418,7 +415,7 @@ func (jr jobRequest) combinedEscaped() []string {
return jobs return jobs
} }
func (jr jobRequest) URL() string { func (jr jobRequest) url() string {
return "/job/" + strings.Join(jr.combinedEscaped(), "/job/") + jobPath return "/job/" + strings.Join(jr.combinedEscaped(), "/job/") + jobPath
} }
@ -435,13 +432,13 @@ func (jr jobRequest) parentsString() string {
} }
func (j *Jenkins) gatherJobBuild(jr jobRequest, b *buildResponse, acc telegraf.Accumulator) { func (j *Jenkins) gatherJobBuild(jr jobRequest, b *buildResponse, acc telegraf.Accumulator) {
tags := map[string]string{"name": jr.name, "parents": jr.parentsString(), "result": b.Result, "source": j.Source, "port": j.Port} tags := map[string]string{"name": jr.name, "parents": jr.parentsString(), "result": b.Result, "source": j.source, "port": j.port}
fields := make(map[string]interface{}) fields := make(map[string]interface{})
fields["duration"] = b.Duration fields["duration"] = b.Duration
fields["result_code"] = mapResultCode(b.Result) fields["result_code"] = mapResultCode(b.Result)
fields["number"] = b.Number fields["number"] = b.Number
acc.AddFields(measurementJob, fields, tags, b.GetTimestamp()) acc.AddFields(measurementJob, fields, tags, b.getTimestamp())
} }
// perform status mapping // perform status mapping

View File

@ -46,7 +46,7 @@ func TestJobRequest(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
hierarchyName := test.input.hierarchyName() hierarchyName := test.input.hierarchyName()
address := test.input.URL() address := test.input.url()
if hierarchyName != test.hierarchyName { if hierarchyName != test.hierarchyName {
t.Errorf("Expected %s, got %s\n", test.hierarchyName, hierarchyName) t.Errorf("Expected %s, got %s\n", test.hierarchyName, hierarchyName)
} }

View File

@ -19,9 +19,9 @@ import (
var sampleConfig string var sampleConfig string
type JolokiaAgent struct { type JolokiaAgent struct {
DefaultFieldPrefix string DefaultFieldPrefix string `toml:"default_field_prefix"`
DefaultFieldSeparator string DefaultFieldSeparator string `toml:"default_field_separator"`
DefaultTagPrefix string DefaultTagPrefix string `toml:"default_tag_prefix"`
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
Username string `toml:"username"` Username string `toml:"username"`

View File

@ -86,7 +86,7 @@ func TestScalarValues(t *testing.T) {
server := setupServer(response) server := setupServer(response)
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -165,7 +165,7 @@ func TestObjectValues(t *testing.T) {
server := setupServer(string(response)) server := setupServer(string(response))
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -253,7 +253,7 @@ func TestStatusCodes(t *testing.T) {
server := setupServer(response) server := setupServer(response)
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -303,7 +303,7 @@ func TestTagRenaming(t *testing.T) {
server := setupServer(response) server := setupServer(response)
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -396,7 +396,7 @@ func TestFieldRenaming(t *testing.T) {
server := setupServer(response) server := setupServer(response)
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -504,7 +504,7 @@ func TestMetricMbeanMatching(t *testing.T) {
server := setupServer(response) server := setupServer(response)
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -597,7 +597,7 @@ func TestMetricCompaction(t *testing.T) {
server := setupServer(response) server := setupServer(response)
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -643,7 +643,7 @@ func TestJolokia2_ClientAuthRequest(t *testing.T) {
})) }))
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(` plugin := setupPlugin(t, fmt.Sprintf(`
[jolokia2_agent] [jolokia2_agent]
urls = ["%s/jolokia"] urls = ["%s/jolokia"]
username = "sally" username = "sally"
@ -904,7 +904,7 @@ func setupServer(resp string) *httptest.Server {
})) }))
} }
func SetupPlugin(t *testing.T, conf string) telegraf.Input { func setupPlugin(t *testing.T, conf string) telegraf.Input {
table, err := toml.Parse([]byte(conf)) table, err := toml.Parse([]byte(conf))
if err != nil { if err != nil {
t.Fatalf("Unable to parse config! %v", err) t.Fatalf("Unable to parse config! %v", err)

View File

@ -23,7 +23,7 @@ type JolokiaProxy struct {
URL string `toml:"url"` URL string `toml:"url"`
DefaultTargetPassword string `toml:"default_target_password"` DefaultTargetPassword string `toml:"default_target_password"`
DefaultTargetUsername string `toml:"default_target_username"` DefaultTargetUsername string `toml:"default_target_username"`
Targets []JolokiaProxyTargetConfig `toml:"target"` Targets []jolokiaProxyTargetConfig `toml:"target"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
@ -36,7 +36,7 @@ type JolokiaProxy struct {
gatherer *common.Gatherer gatherer *common.Gatherer
} }
type JolokiaProxyTargetConfig struct { type jolokiaProxyTargetConfig struct {
URL string `toml:"url"` URL string `toml:"url"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`

View File

@ -57,7 +57,7 @@ func TestJolokia2_ProxyTargets(t *testing.T) {
server := setupServer(response) server := setupServer(response)
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(config, server.URL)) plugin := setupPlugin(t, fmt.Sprintf(config, server.URL))
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -106,7 +106,7 @@ func TestJolokia2_ClientProxyAuthRequest(t *testing.T) {
})) }))
defer server.Close() defer server.Close()
plugin := SetupPlugin(t, fmt.Sprintf(` plugin := setupPlugin(t, fmt.Sprintf(`
[jolokia2_proxy] [jolokia2_proxy]
url = "%s/jolokia" url = "%s/jolokia"
username = "sally" username = "sally"
@ -169,7 +169,7 @@ func setupServer(resp string) *httptest.Server {
})) }))
} }
func SetupPlugin(t *testing.T, conf string) telegraf.Input { func setupPlugin(t *testing.T, conf string) telegraf.Input {
table, err := toml.Parse([]byte(conf)) table, err := toml.Parse([]byte(conf))
if err != nil { if err != nil {
t.Fatalf("Unable to parse config! %v", err) t.Fatalf("Unable to parse config! %v", err)

View File

@ -2,21 +2,21 @@ package jti_openconfig_telemetry
import "sort" import "sort"
type DataGroup struct { type dataGroup struct {
numKeys int numKeys int
tags map[string]string tags map[string]string
data map[string]interface{} data map[string]interface{}
} }
// Sort the data groups by number of keys // Sort the data groups by number of keys
type CollectionByKeys []DataGroup type collectionByKeys []dataGroup
func (a CollectionByKeys) Len() int { return len(a) } func (a collectionByKeys) Len() int { return len(a) }
func (a CollectionByKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a collectionByKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CollectionByKeys) Less(i, j int) bool { return a[i].numKeys < a[j].numKeys } func (a collectionByKeys) Less(i, j int) bool { return a[i].numKeys < a[j].numKeys }
// Checks to see if there is already a group with these tags and returns its index. Returns -1 if unavailable. // Checks to see if there is already a group with these tags and returns its index. Returns -1 if unavailable.
func (a CollectionByKeys) IsAvailable(tags map[string]string) *DataGroup { func (a collectionByKeys) isAvailable(tags map[string]string) *dataGroup {
sort.Sort(a) sort.Sort(a)
// Iterate through all the groups and see if we have group with these tags // Iterate through all the groups and see if we have group with these tags
@ -45,14 +45,14 @@ func (a CollectionByKeys) IsAvailable(tags map[string]string) *DataGroup {
} }
// Inserts into already existing group or creates a new group // Inserts into already existing group or creates a new group
func (a CollectionByKeys) Insert(tags map[string]string, data map[string]interface{}) CollectionByKeys { func (a collectionByKeys) insert(tags map[string]string, data map[string]interface{}) collectionByKeys {
// If there is already a group with this set of tags, insert into it. Otherwise create a new group and insert // If there is already a group with this set of tags, insert into it. Otherwise create a new group and insert
if group := a.IsAvailable(tags); group != nil { if group := a.isAvailable(tags); group != nil {
for k, v := range data { for k, v := range data {
group.data[k] = v group.data[k] = v
} }
} else { } else {
a = append(a, DataGroup{len(tags), tags, data}) a = append(a, dataGroup{len(tags), tags, data})
} }
return a return a

View File

@ -31,6 +31,11 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
var (
// Regex to match and extract data points from path value in received key
keyPathRegex = regexp.MustCompile(`/([^/]*)\[([A-Za-z0-9\-/]*=[^\[]*)]`)
)
type OpenConfigTelemetry struct { type OpenConfigTelemetry struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
Sensors []string `toml:"sensors"` Sensors []string `toml:"sensors"`
@ -45,28 +50,29 @@ type OpenConfigTelemetry struct {
KeepAlivePeriod config.Duration `toml:"keep_alive_period"` KeepAlivePeriod config.Duration `toml:"keep_alive_period"`
common_tls.ClientConfig common_tls.ClientConfig
Log telegraf.Logger Log telegraf.Logger `toml:"-"`
sensorsConfig []sensorConfig sensorsConfig []sensorConfig
grpcClientConns []grpcConnection grpcClientConns []grpcConnection
wg *sync.WaitGroup wg *sync.WaitGroup
} }
// Structure to hold sensors path list and measurement name
type sensorConfig struct {
measurementName string
pathList []*telemetry.Path
}
type grpcConnection struct { type grpcConnection struct {
connection *grpc.ClientConn connection *grpc.ClientConn
cancel context.CancelFunc cancel context.CancelFunc
} }
func (g *grpcConnection) Close() { func (g *grpcConnection) close() {
g.connection.Close() g.connection.Close()
g.cancel() g.cancel()
} }
var (
// Regex to match and extract data points from path value in received key
keyPathRegex = regexp.MustCompile(`/([^/]*)\[([A-Za-z0-9\-/]*=[^\[]*)]`)
)
func (*OpenConfigTelemetry) SampleConfig() string { func (*OpenConfigTelemetry) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -82,13 +88,97 @@ func (m *OpenConfigTelemetry) Init() error {
return nil return nil
} }
func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
// Build sensors config
if m.splitSensorConfig() == 0 {
return errors.New("no valid sensor configuration available")
}
// Parse TLS config
var creds credentials.TransportCredentials
if m.EnableTLS {
tlscfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return err
}
creds = credentials.NewTLS(tlscfg)
} else {
creds = insecure.NewCredentials()
}
// Setup the basic connection options
options := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
// Add keep-alive settings
if m.KeepAlivePeriod > 0 {
params := keepalive.ClientParameters{
Time: time.Duration(m.KeepAlivePeriod),
Timeout: 2 * time.Duration(m.KeepAlivePeriod),
}
options = append(options, grpc.WithKeepaliveParams(params))
}
// Connect to given list of servers and start collecting data
var grpcClientConn *grpc.ClientConn
var wg sync.WaitGroup
m.wg = &wg
for _, server := range m.Servers {
ctx, cancel := context.WithCancel(context.Background())
if len(m.Username) > 0 {
ctx = metadata.AppendToOutgoingContext(
ctx,
"username", m.Username,
"password", m.Password,
"clientid", m.ClientID,
)
}
// Extract device address and port
grpcServer, grpcPort, err := net.SplitHostPort(server)
if err != nil {
m.Log.Errorf("Invalid server address: %s", err.Error())
cancel()
continue
}
grpcClientConn, err = grpc.NewClient(server, options...)
if err != nil {
m.Log.Errorf("Failed to connect to %s: %s", server, err.Error())
} else {
m.Log.Debugf("Opened a new gRPC session to %s on port %s", grpcServer, grpcPort)
}
// Add to the list of client connections
connection := grpcConnection{
connection: grpcClientConn,
cancel: cancel,
}
m.grpcClientConns = append(m.grpcClientConns, connection)
if m.Username != "" && m.Password != "" && m.ClientID != "" {
if err := m.authenticate(ctx, server, grpcClientConn); err != nil {
m.Log.Errorf("Error authenticating to %s: %v", grpcServer, err)
continue
}
}
// Subscribe and gather telemetry data
m.collectData(ctx, grpcServer, grpcClientConn, acc)
}
return nil
}
func (m *OpenConfigTelemetry) Gather(_ telegraf.Accumulator) error { func (m *OpenConfigTelemetry) Gather(_ telegraf.Accumulator) error {
return nil return nil
} }
func (m *OpenConfigTelemetry) Stop() { func (m *OpenConfigTelemetry) Stop() {
for _, grpcClientConn := range m.grpcClientConns { for _, grpcClientConn := range m.grpcClientConns {
grpcClientConn.Close() grpcClientConn.close()
} }
m.wg.Wait() m.wg.Wait()
} }
@ -123,11 +213,11 @@ func spitTagsNPath(xmlpath string) (string, map[string]string) {
// Takes in a OC response, extracts tag information from keys and returns a // Takes in a OC response, extracts tag information from keys and returns a
// list of groups with unique sets of tags+values // list of groups with unique sets of tags+values
func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServer string) []DataGroup { func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServer string) []dataGroup {
// Use empty prefix. We will update this when we iterate over key-value pairs // Use empty prefix. We will update this when we iterate over key-value pairs
prefix := "" prefix := ""
dgroups := []DataGroup{} dgroups := []dataGroup{}
for _, v := range r.Kv { for _, v := range r.Kv {
kv := make(map[string]interface{}) kv := make(map[string]interface{})
@ -168,28 +258,22 @@ func (m *OpenConfigTelemetry) extractData(r *telemetry.OpenConfigData, grpcServe
finaltags["path"] = r.Path finaltags["path"] = r.Path
// Insert derived key and value // Insert derived key and value
dgroups = CollectionByKeys(dgroups).Insert(finaltags, kv) dgroups = collectionByKeys(dgroups).insert(finaltags, kv)
// Insert data from message header // Insert data from message header
dgroups = CollectionByKeys(dgroups).Insert(finaltags, dgroups = collectionByKeys(dgroups).insert(finaltags,
map[string]interface{}{"_sequence": r.SequenceNumber}) map[string]interface{}{"_sequence": r.SequenceNumber})
dgroups = CollectionByKeys(dgroups).Insert(finaltags, dgroups = collectionByKeys(dgroups).insert(finaltags,
map[string]interface{}{"_timestamp": r.Timestamp}) map[string]interface{}{"_timestamp": r.Timestamp})
dgroups = CollectionByKeys(dgroups).Insert(finaltags, dgroups = collectionByKeys(dgroups).insert(finaltags,
map[string]interface{}{"_component_id": r.ComponentId}) map[string]interface{}{"_component_id": r.ComponentId})
dgroups = CollectionByKeys(dgroups).Insert(finaltags, dgroups = collectionByKeys(dgroups).insert(finaltags,
map[string]interface{}{"_subcomponent_id": r.SubComponentId}) map[string]interface{}{"_subcomponent_id": r.SubComponentId})
} }
return dgroups return dgroups
} }
// Structure to hold sensors path list and measurement name
type sensorConfig struct {
measurementName string
pathList []*telemetry.Path
}
// Takes in sensor configuration and converts it into slice of sensorConfig objects // Takes in sensor configuration and converts it into slice of sensorConfig objects
func (m *OpenConfigTelemetry) splitSensorConfig() int { func (m *OpenConfigTelemetry) splitSensorConfig() int {
var pathlist []*telemetry.Path var pathlist []*telemetry.Path
@ -366,90 +450,6 @@ func (m *OpenConfigTelemetry) authenticate(ctx context.Context, server string, g
return nil return nil
} }
func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
// Build sensors config
if m.splitSensorConfig() == 0 {
return errors.New("no valid sensor configuration available")
}
// Parse TLS config
var creds credentials.TransportCredentials
if m.EnableTLS {
tlscfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return err
}
creds = credentials.NewTLS(tlscfg)
} else {
creds = insecure.NewCredentials()
}
// Setup the basic connection options
options := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
// Add keep-alive settings
if m.KeepAlivePeriod > 0 {
params := keepalive.ClientParameters{
Time: time.Duration(m.KeepAlivePeriod),
Timeout: 2 * time.Duration(m.KeepAlivePeriod),
}
options = append(options, grpc.WithKeepaliveParams(params))
}
// Connect to given list of servers and start collecting data
var grpcClientConn *grpc.ClientConn
var wg sync.WaitGroup
m.wg = &wg
for _, server := range m.Servers {
ctx, cancel := context.WithCancel(context.Background())
if len(m.Username) > 0 {
ctx = metadata.AppendToOutgoingContext(
ctx,
"username", m.Username,
"password", m.Password,
"clientid", m.ClientID,
)
}
// Extract device address and port
grpcServer, grpcPort, err := net.SplitHostPort(server)
if err != nil {
m.Log.Errorf("Invalid server address: %s", err.Error())
cancel()
continue
}
grpcClientConn, err = grpc.NewClient(server, options...)
if err != nil {
m.Log.Errorf("Failed to connect to %s: %s", server, err.Error())
} else {
m.Log.Debugf("Opened a new gRPC session to %s on port %s", grpcServer, grpcPort)
}
// Add to the list of client connections
connection := grpcConnection{
connection: grpcClientConn,
cancel: cancel,
}
m.grpcClientConns = append(m.grpcClientConns, connection)
if m.Username != "" && m.Password != "" && m.ClientID != "" {
if err := m.authenticate(ctx, server, grpcClientConn); err != nil {
m.Log.Errorf("Error authenticating to %s: %v", grpcServer, err)
continue
}
}
// Subscribe and gather telemetry data
m.collectData(ctx, grpcServer, grpcClientConn, acc)
}
return nil
}
func init() { func init() {
inputs.Add("jti_openconfig_telemetry", func() telegraf.Input { inputs.Add("jti_openconfig_telemetry", func() telegraf.Input {
return &OpenConfigTelemetry{ return &OpenConfigTelemetry{