chore: Fix linter findings for `revive:exported` in `plugins/inputs/o*` (#16224)

This commit is contained in:
Paweł Żak 2024-12-05 17:35:28 +01:00 committed by GitHub
parent 18cdb1a99e
commit 7dc0e18223
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 410 additions and 427 deletions

View File

@ -16,26 +16,24 @@ import (
var sampleConfig string var sampleConfig string
type OpcUA struct { type OpcUA struct {
ReadClientConfig readClientConfig
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
client *ReadClient client *readClient
} }
func (*OpcUA) SampleConfig() string { func (*OpcUA) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init Initialise all required objects
func (o *OpcUA) Init() (err error) { func (o *OpcUA) Init() (err error) {
o.client, err = o.ReadClientConfig.CreateReadClient(o.Log) o.client, err = o.readClientConfig.createReadClient(o.Log)
return err return err
} }
// Gather defines what data the plugin will gather.
func (o *OpcUA) Gather(acc telegraf.Accumulator) error { func (o *OpcUA) Gather(acc telegraf.Accumulator) error {
// Will (re)connect if the client is disconnected // Will (re)connect if the client is disconnected
metrics, err := o.client.CurrentValues() metrics, err := o.client.currentValues()
if err != nil { if err != nil {
return err return err
} }
@ -51,7 +49,7 @@ func (o *OpcUA) Gather(acc telegraf.Accumulator) error {
func init() { func init() {
inputs.Add("opcua", func() telegraf.Input { inputs.Add("opcua", func() telegraf.Input {
return &OpcUA{ return &OpcUA{
ReadClientConfig: ReadClientConfig{ readClientConfig: readClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",

View File

@ -19,19 +19,19 @@ import (
const servicePort = "4840" const servicePort = "4840"
type OPCTags struct { type opcTags struct {
Name string name string
Namespace string namespace string
IdentifierType string identifierType string
Identifier string identifier string
Want interface{} want interface{}
} }
func MapOPCTag(tags OPCTags) (out input.NodeSettings) { func mapOPCTag(tags opcTags) (out input.NodeSettings) {
out.FieldName = tags.Name out.FieldName = tags.name
out.Namespace = tags.Namespace out.Namespace = tags.namespace
out.IdentifierType = tags.IdentifierType out.IdentifierType = tags.identifierType
out.Identifier = tags.Identifier out.Identifier = tags.identifier
return out return out
} }
@ -52,13 +52,13 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) {
require.NoError(t, err, "failed to start container") require.NoError(t, err, "failed to start container")
defer container.Terminate() defer container.Terminate()
testopctags := []OPCTags{ testopctags := []opcTags{
{"ProductName", "1", "i", "2261", "open62541 OPC UA Server"}, {"ProductName", "1", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"}, {"ManufacturerName", "0", "i", "2263", "open62541"},
} }
readConfig := ReadClientConfig{ readConfig := readClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
@ -83,14 +83,14 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) {
} }
for _, tags := range testopctags { for _, tags := range testopctags {
g.Nodes = append(g.Nodes, MapOPCTag(tags)) g.Nodes = append(g.Nodes, mapOPCTag(tags))
} }
readConfig.Groups = append(readConfig.Groups, g) readConfig.Groups = append(readConfig.Groups, g)
logger := &testutil.CaptureLogger{} logger := &testutil.CaptureLogger{}
readClient, err := readConfig.CreateReadClient(logger) readClient, err := readConfig.createReadClient(logger)
require.NoError(t, err) require.NoError(t, err)
err = readClient.Connect() err = readClient.connect()
require.NoError(t, err) require.NoError(t, err)
} }
@ -111,7 +111,7 @@ func TestReadClientIntegration(t *testing.T) {
require.NoError(t, err, "failed to start container") require.NoError(t, err, "failed to start container")
defer container.Terminate() defer container.Terminate()
testopctags := []OPCTags{ testopctags := []opcTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"}, {"ManufacturerName", "0", "i", "2263", "open62541"},
@ -120,7 +120,7 @@ func TestReadClientIntegration(t *testing.T) {
{"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"}, {"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"},
} }
readConfig := ReadClientConfig{ readConfig := readClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
@ -138,17 +138,17 @@ func TestReadClientIntegration(t *testing.T) {
} }
for _, tags := range testopctags { for _, tags := range testopctags {
readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags))
} }
client, err := readConfig.CreateReadClient(testutil.Logger{}) client, err := readConfig.createReadClient(testutil.Logger{})
require.NoError(t, err) require.NoError(t, err)
err = client.Connect() err = client.connect()
require.NoError(t, err, "Connect") require.NoError(t, err)
for i, v := range client.LastReceivedData { for i, v := range client.LastReceivedData {
require.Equal(t, testopctags[i].Want, v.Value) require.Equal(t, testopctags[i].want, v.Value)
} }
} }
@ -168,7 +168,7 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) {
require.NoError(t, container.Start(), "failed to start container") require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate() defer container.Terminate()
testopctags := []OPCTags{ testopctags := []opcTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"}, {"ManufacturerName", "0", "i", "2263", "open62541"},
@ -196,17 +196,17 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) {
for i, x := range testopctags { for i, x := range testopctags {
now := time.Now() now := time.Now()
tags := map[string]string{ tags := map[string]string{
"id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier), "id": fmt.Sprintf("ns=%s;%s=%s", x.namespace, x.identifierType, x.identifier),
} }
fields := map[string]interface{}{ fields := map[string]interface{}{
x.Name: x.Want, x.name: x.want,
"Quality": testopcquality[i], "Quality": testopcquality[i],
"DataType": testopctypes[i], "DataType": testopctypes[i],
} }
expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now)) expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now))
} }
readConfig := ReadClientConfig{ readConfig := readClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
@ -225,13 +225,13 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) {
} }
for _, tags := range testopctags { for _, tags := range testopctags {
readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags))
} }
client, err := readConfig.CreateReadClient(testutil.Logger{}) client, err := readConfig.createReadClient(testutil.Logger{})
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, client.Connect()) require.NoError(t, client.connect())
actualopcmetrics := make([]telegraf.Metric, 0, len(client.LastReceivedData)) actualopcmetrics := make([]telegraf.Metric, 0, len(client.LastReceivedData))
for i := range client.LastReceivedData { for i := range client.LastReceivedData {
@ -258,13 +258,13 @@ func TestReadClientIntegrationWithPasswordAuth(t *testing.T) {
require.NoError(t, err, "failed to start container") require.NoError(t, err, "failed to start container")
defer container.Terminate() defer container.Terminate()
testopctags := []OPCTags{ testopctags := []opcTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"}, {"ManufacturerName", "0", "i", "2263", "open62541"},
} }
readConfig := ReadClientConfig{ readConfig := readClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
@ -284,17 +284,17 @@ func TestReadClientIntegrationWithPasswordAuth(t *testing.T) {
} }
for _, tags := range testopctags { for _, tags := range testopctags {
readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) readConfig.RootNodes = append(readConfig.RootNodes, mapOPCTag(tags))
} }
client, err := readConfig.CreateReadClient(testutil.Logger{}) client, err := readConfig.createReadClient(testutil.Logger{})
require.NoError(t, err) require.NoError(t, err)
err = client.Connect() err = client.connect()
require.NoError(t, err, "Connect") require.NoError(t, err)
for i, v := range client.LastReceivedData { for i, v := range client.LastReceivedData {
require.Equal(t, testopctags[i].Want, v.Value) require.Equal(t, testopctags[i].want, v.Value)
} }
} }
@ -369,17 +369,17 @@ use_unregistered_reads = true
o, ok := c.Inputs[0].Input.(*OpcUA) o, ok := c.Inputs[0].Input.(*OpcUA)
require.True(t, ok) require.True(t, ok)
require.Equal(t, "localhost", o.ReadClientConfig.MetricName) require.Equal(t, "localhost", o.readClientConfig.MetricName)
require.Equal(t, "opc.tcp://localhost:4840", o.ReadClientConfig.Endpoint) require.Equal(t, "opc.tcp://localhost:4840", o.readClientConfig.Endpoint)
require.Equal(t, config.Duration(10*time.Second), o.ReadClientConfig.ConnectTimeout) require.Equal(t, config.Duration(10*time.Second), o.readClientConfig.ConnectTimeout)
require.Equal(t, config.Duration(5*time.Second), o.ReadClientConfig.RequestTimeout) require.Equal(t, config.Duration(5*time.Second), o.readClientConfig.RequestTimeout)
require.Equal(t, "auto", o.ReadClientConfig.SecurityPolicy) require.Equal(t, "auto", o.readClientConfig.SecurityPolicy)
require.Equal(t, "auto", o.ReadClientConfig.SecurityMode) require.Equal(t, "auto", o.readClientConfig.SecurityMode)
require.Equal(t, "/etc/telegraf/cert.pem", o.ReadClientConfig.Certificate) require.Equal(t, "/etc/telegraf/cert.pem", o.readClientConfig.Certificate)
require.Equal(t, "/etc/telegraf/key.pem", o.ReadClientConfig.PrivateKey) require.Equal(t, "/etc/telegraf/key.pem", o.readClientConfig.PrivateKey)
require.Equal(t, "Anonymous", o.ReadClientConfig.AuthMethod) require.Equal(t, "Anonymous", o.readClientConfig.AuthMethod)
require.True(t, o.ReadClientConfig.Username.Empty()) require.True(t, o.readClientConfig.Username.Empty())
require.True(t, o.ReadClientConfig.Password.Empty()) require.True(t, o.readClientConfig.Password.Empty())
require.Equal(t, []input.NodeSettings{ require.Equal(t, []input.NodeSettings{
{ {
FieldName: "name", FieldName: "name",
@ -396,7 +396,7 @@ use_unregistered_reads = true
TagsSlice: [][]string{{"tag0", "val0"}, {"tag00", "val00"}}, TagsSlice: [][]string{{"tag0", "val0"}, {"tag00", "val00"}},
DefaultTags: map[string]string{"tag6": "val6"}, DefaultTags: map[string]string{"tag6": "val6"},
}, },
}, o.ReadClientConfig.RootNodes) }, o.readClientConfig.RootNodes)
require.Equal(t, []input.NodeGroupSettings{ require.Equal(t, []input.NodeGroupSettings{
{ {
MetricName: "foo", MetricName: "foo",
@ -424,10 +424,10 @@ use_unregistered_reads = true
Identifier: "4001", Identifier: "4001",
}}, }},
}, },
}, o.ReadClientConfig.Groups) }, o.readClientConfig.Groups)
require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.ReadClientConfig.Workarounds) require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.readClientConfig.Workarounds)
require.Equal(t, ReadClientWorkarounds{UseUnregisteredReads: true}, o.ReadClientConfig.ReadClientWorkarounds) require.Equal(t, readClientWorkarounds{UseUnregisteredReads: true}, o.readClientConfig.ReadClientWorkarounds)
require.Equal(t, []string{"DataType"}, o.ReadClientConfig.OptionalFields) require.Equal(t, []string{"DataType"}, o.readClientConfig.OptionalFields)
err = o.Init() err = o.Init()
require.NoError(t, err) require.NoError(t, err)
require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes") require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes")

View File

@ -15,33 +15,33 @@ import (
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
) )
type ReadClientWorkarounds struct { type readClientWorkarounds struct {
UseUnregisteredReads bool `toml:"use_unregistered_reads"` UseUnregisteredReads bool `toml:"use_unregistered_reads"`
} }
type ReadClientConfig struct { type readClientConfig struct {
ReadRetryTimeout config.Duration `toml:"read_retry_timeout"` ReadRetryTimeout config.Duration `toml:"read_retry_timeout"`
ReadRetries uint64 `toml:"read_retry_count"` ReadRetries uint64 `toml:"read_retry_count"`
ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"` ReadClientWorkarounds readClientWorkarounds `toml:"request_workarounds"`
input.InputClientConfig input.InputClientConfig
} }
// ReadClient Requests the current values from the required nodes when gather is called. // readClient Requests the current values from the required nodes when gather is called.
type ReadClient struct { type readClient struct {
*input.OpcUAInputClient *input.OpcUAInputClient
ReadRetryTimeout time.Duration ReadRetryTimeout time.Duration
ReadRetries uint64 ReadRetries uint64
ReadSuccess selfstat.Stat ReadSuccess selfstat.Stat
ReadError selfstat.Stat ReadError selfstat.Stat
Workarounds ReadClientWorkarounds Workarounds readClientWorkarounds
// internal values // internal values
reqIDs []*ua.ReadValueID reqIDs []*ua.ReadValueID
ctx context.Context ctx context.Context
} }
func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) { func (rc *readClientConfig) createReadClient(log telegraf.Logger) (*readClient, error) {
inputClient, err := rc.InputClientConfig.CreateInputClient(log) inputClient, err := rc.InputClientConfig.CreateInputClient(log)
if err != nil { if err != nil {
return nil, err return nil, err
@ -55,7 +55,7 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
rc.ReadRetryTimeout = config.Duration(100 * time.Millisecond) rc.ReadRetryTimeout = config.Duration(100 * time.Millisecond)
} }
return &ReadClient{ return &readClient{
OpcUAInputClient: inputClient, OpcUAInputClient: inputClient,
ReadRetryTimeout: time.Duration(rc.ReadRetryTimeout), ReadRetryTimeout: time.Duration(rc.ReadRetryTimeout),
ReadRetries: rc.ReadRetries, ReadRetries: rc.ReadRetries,
@ -65,7 +65,7 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
}, nil }, nil
} }
func (o *ReadClient) Connect() error { func (o *readClient) connect() error {
o.ctx = context.Background() o.ctx = context.Background()
if err := o.OpcUAClient.Connect(o.ctx); err != nil { if err := o.OpcUAClient.Connect(o.ctx); err != nil {
@ -103,14 +103,14 @@ func (o *ReadClient) Connect() error {
return nil return nil
} }
func (o *ReadClient) ensureConnected() error { func (o *readClient) ensureConnected() error {
if o.State() == opcua.Disconnected || o.State() == opcua.Closed { if o.State() == opcua.Disconnected || o.State() == opcua.Closed {
return o.Connect() return o.connect()
} }
return nil return nil
} }
func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) { func (o *readClient) currentValues() ([]telegraf.Metric, error) {
if err := o.ensureConnected(); err != nil { if err := o.ensureConnected(); err != nil {
return nil, err return nil, err
} }
@ -142,7 +142,7 @@ func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) {
return metrics, nil return metrics, nil
} }
func (o *ReadClient) read() error { func (o *readClient) read() error {
req := &ua.ReadRequest{ req := &ua.ReadRequest{
MaxAge: 2000, MaxAge: 2000,
TimestampsToReturn: ua.TimestampsToReturnBoth, TimestampsToReturn: ua.TimestampsToReturnBoth,

View File

@ -15,8 +15,8 @@ import (
) )
type OpcUaListener struct { type OpcUaListener struct {
SubscribeClientConfig subscribeClientConfig
client *SubscribeClient client *subscribeClient
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
@ -36,20 +36,35 @@ func (o *OpcUaListener) Init() (err error) {
default: default:
return fmt.Errorf("unknown setting %q for 'connect_fail_behavior'", o.ConnectFailBehavior) return fmt.Errorf("unknown setting %q for 'connect_fail_behavior'", o.ConnectFailBehavior)
} }
o.client, err = o.SubscribeClientConfig.CreateSubscribeClient(o.Log) o.client, err = o.subscribeClientConfig.createSubscribeClient(o.Log)
return err return err
} }
func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
return o.connect(acc)
}
func (o *OpcUaListener) Gather(acc telegraf.Accumulator) error { func (o *OpcUaListener) Gather(acc telegraf.Accumulator) error {
if o.client.State() == opcua.Connected || o.SubscribeClientConfig.ConnectFailBehavior == "ignore" { if o.client.State() == opcua.Connected || o.subscribeClientConfig.ConnectFailBehavior == "ignore" {
return nil return nil
} }
return o.connect(acc) return o.connect(acc)
} }
func (o *OpcUaListener) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
select {
case <-o.client.stop(ctx):
o.Log.Infof("Unsubscribed OPC UA successfully")
case <-ctx.Done(): // Timeout context
o.Log.Warn("Timeout while stopping OPC UA subscription")
}
cancel()
}
func (o *OpcUaListener) connect(acc telegraf.Accumulator) error { func (o *OpcUaListener) connect(acc telegraf.Accumulator) error {
ctx := context.Background() ctx := context.Background()
ch, err := o.client.StartStreamValues(ctx) ch, err := o.client.startStreamValues(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -68,26 +83,10 @@ func (o *OpcUaListener) connect(acc telegraf.Accumulator) error {
return nil return nil
} }
func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
return o.connect(acc)
}
func (o *OpcUaListener) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
select {
case <-o.client.Stop(ctx):
o.Log.Infof("Unsubscribed OPC UA successfully")
case <-ctx.Done(): // Timeout context
o.Log.Warn("Timeout while stopping OPC UA subscription")
}
cancel()
}
// Add this plugin to telegraf
func init() { func init() {
inputs.Add("opcua_listener", func() telegraf.Input { inputs.Add("opcua_listener", func() telegraf.Input {
return &OpcUaListener{ return &OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{ subscribeClientConfig: subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",

View File

@ -21,25 +21,25 @@ import (
const servicePort = "4840" const servicePort = "4840"
type OPCTags struct { type opcTags struct {
Name string name string
Namespace string namespace string
IdentifierType string identifierType string
Identifier string identifier string
Want interface{} want interface{}
} }
func MapOPCTag(tags OPCTags) (out input.NodeSettings) { func mapOPCTag(tags opcTags) (out input.NodeSettings) {
out.FieldName = tags.Name out.FieldName = tags.name
out.Namespace = tags.Namespace out.Namespace = tags.namespace
out.IdentifierType = tags.IdentifierType out.IdentifierType = tags.identifierType
out.Identifier = tags.Identifier out.Identifier = tags.identifier
return out return out
} }
func TestInitPluginWithBadConnectFailBehaviorValue(t *testing.T) { func TestInitPluginWithBadConnectFailBehaviorValue(t *testing.T) {
plugin := OpcUaListener{ plugin := OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{ subscribeClientConfig: subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://notarealserver:4840", Endpoint: "opc.tcp://notarealserver:4840",
@ -69,7 +69,7 @@ func TestStartPlugin(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
plugin := OpcUaListener{ plugin := OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{ subscribeClientConfig: subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://notarealserver:4840", Endpoint: "opc.tcp://notarealserver:4840",
@ -86,17 +86,17 @@ func TestStartPlugin(t *testing.T) {
}, },
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
testopctags := []OPCTags{ testopctags := []opcTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
} }
for _, tags := range testopctags { for _, tags := range testopctags {
plugin.SubscribeClientConfig.RootNodes = append(plugin.SubscribeClientConfig.RootNodes, MapOPCTag(tags)) plugin.subscribeClientConfig.RootNodes = append(plugin.subscribeClientConfig.RootNodes, mapOPCTag(tags))
} }
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
err := plugin.Start(acc) err := plugin.Start(acc)
require.ErrorContains(t, err, "could not resolve address") require.ErrorContains(t, err, "could not resolve address")
plugin.SubscribeClientConfig.ConnectFailBehavior = "ignore" plugin.subscribeClientConfig.ConnectFailBehavior = "ignore"
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(acc)) require.NoError(t, plugin.Start(acc))
require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State()) require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State())
@ -110,7 +110,7 @@ func TestStartPlugin(t *testing.T) {
wait.ForLog("TCP network layer listening on opc.tcp://"), wait.ForLog("TCP network layer listening on opc.tcp://"),
), ),
} }
plugin.SubscribeClientConfig.ConnectFailBehavior = "retry" plugin.subscribeClientConfig.ConnectFailBehavior = "retry"
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(acc)) require.NoError(t, plugin.Start(acc))
require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State()) require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State())
@ -144,7 +144,7 @@ func TestSubscribeClientIntegration(t *testing.T) {
require.NoError(t, err, "failed to start container") require.NoError(t, err, "failed to start container")
defer container.Terminate() defer container.Terminate()
testopctags := []OPCTags{ testopctags := []opcTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"}, {"ManufacturerName", "0", "i", "2263", "open62541"},
@ -154,12 +154,12 @@ func TestSubscribeClientIntegration(t *testing.T) {
} }
tagsRemaining := make([]string, 0, len(testopctags)) tagsRemaining := make([]string, 0, len(testopctags))
for i, tag := range testopctags { for i, tag := range testopctags {
if tag.Want != nil { if tag.want != nil {
tagsRemaining = append(tagsRemaining, testopctags[i].Name) tagsRemaining = append(tagsRemaining, testopctags[i].name)
} }
} }
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
@ -177,9 +177,9 @@ func TestSubscribeClientIntegration(t *testing.T) {
SubscriptionInterval: 0, SubscriptionInterval: 0,
} }
for _, tags := range testopctags { for _, tags := range testopctags {
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags)) subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, mapOPCTag(tags))
} }
o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) o, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.NoError(t, err) require.NoError(t, err)
// give initial setup a couple extra attempts, as on CircleCI this can be // give initial setup a couple extra attempts, as on CircleCI this can be
@ -188,12 +188,12 @@ func TestSubscribeClientIntegration(t *testing.T) {
return o.SetupOptions() == nil return o.SetupOptions() == nil
}, 5*time.Second, 10*time.Millisecond) }, 5*time.Second, 10*time.Millisecond)
err = o.Connect() err = o.connect()
require.NoError(t, err, "Connection failed") require.NoError(t, err, "Connection failed")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
res, err := o.StartStreamValues(ctx) res, err := o.startStreamValues(ctx)
require.Equal(t, opcua.Connected, o.State()) require.Equal(t, opcua.Connected, o.State())
require.NoError(t, err) require.NoError(t, err)
@ -202,16 +202,16 @@ func TestSubscribeClientIntegration(t *testing.T) {
case m := <-res: case m := <-res:
for fieldName, fieldValue := range m.Fields() { for fieldName, fieldValue := range m.Fields() {
for _, tag := range testopctags { for _, tag := range testopctags {
if fieldName != tag.Name { if fieldName != tag.name {
continue continue
} }
if tag.Want == nil { if tag.want == nil {
t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue) t.Errorf("Tag: %s has value: %v", tag.name, fieldValue)
return return
} }
require.Equal(t, tag.Want, fieldValue) require.Equal(t, tag.want, fieldValue)
newRemaining := make([]string, 0, len(tagsRemaining)) newRemaining := make([]string, 0, len(tagsRemaining))
for _, remainingTag := range tagsRemaining { for _, remainingTag := range tagsRemaining {
@ -257,7 +257,7 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
require.NoError(t, container.Start(), "failed to start container") require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate() defer container.Terminate()
testopctags := []OPCTags{ testopctags := []opcTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"}, {"ManufacturerName", "0", "i", "2263", "open62541"},
@ -285,10 +285,10 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
for i, x := range testopctags { for i, x := range testopctags {
now := time.Now() now := time.Now()
tags := map[string]string{ tags := map[string]string{
"id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier), "id": fmt.Sprintf("ns=%s;%s=%s", x.namespace, x.identifierType, x.identifier),
} }
fields := map[string]interface{}{ fields := map[string]interface{}{
x.Name: x.Want, x.name: x.want,
"Quality": testopcquality[i], "Quality": testopcquality[i],
"DataType": testopctypes[i], "DataType": testopctypes[i],
} }
@ -297,12 +297,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
tagsRemaining := make([]string, 0, len(testopctags)) tagsRemaining := make([]string, 0, len(testopctags))
for i, tag := range testopctags { for i, tag := range testopctags {
if tag.Want != nil { if tag.want != nil {
tagsRemaining = append(tagsRemaining, testopctags[i].Name) tagsRemaining = append(tagsRemaining, testopctags[i].name)
} }
} }
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
@ -321,9 +321,9 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
SubscriptionInterval: 0, SubscriptionInterval: 0,
} }
for _, tags := range testopctags { for _, tags := range testopctags {
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags)) subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, mapOPCTag(tags))
} }
o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) o, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.NoError(t, err) require.NoError(t, err)
// give initial setup a couple extra attempts, as on CircleCI this can be // give initial setup a couple extra attempts, as on CircleCI this can be
@ -332,11 +332,11 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
return o.SetupOptions() == nil return o.SetupOptions() == nil
}, 5*time.Second, 10*time.Millisecond) }, 5*time.Second, 10*time.Millisecond)
require.NoError(t, o.Connect(), "Connection failed") require.NoError(t, o.connect(), "Connection failed")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
res, err := o.StartStreamValues(ctx) res, err := o.startStreamValues(ctx)
require.NoError(t, err) require.NoError(t, err)
for { for {
@ -344,12 +344,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
case m := <-res: case m := <-res:
for fieldName, fieldValue := range m.Fields() { for fieldName, fieldValue := range m.Fields() {
for _, tag := range testopctags { for _, tag := range testopctags {
if fieldName != tag.Name { if fieldName != tag.name {
continue continue
} }
// nil-value tags should not be sent from server, error if one does // nil-value tags should not be sent from server, error if one does
if tag.Want == nil { if tag.want == nil {
t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue) t.Errorf("Tag: %s has value: %v", tag.name, fieldValue)
return return
} }
@ -434,19 +434,19 @@ additional_valid_status_codes = ["0xC0"]
o, ok := c.Inputs[0].Input.(*OpcUaListener) o, ok := c.Inputs[0].Input.(*OpcUaListener)
require.True(t, ok) require.True(t, ok)
require.Equal(t, "localhost", o.SubscribeClientConfig.MetricName) require.Equal(t, "localhost", o.subscribeClientConfig.MetricName)
require.Equal(t, "opc.tcp://localhost:4840", o.SubscribeClientConfig.Endpoint) require.Equal(t, "opc.tcp://localhost:4840", o.subscribeClientConfig.Endpoint)
require.Equal(t, config.Duration(10*time.Second), o.SubscribeClientConfig.ConnectTimeout) require.Equal(t, config.Duration(10*time.Second), o.subscribeClientConfig.ConnectTimeout)
require.Equal(t, config.Duration(5*time.Second), o.SubscribeClientConfig.RequestTimeout) require.Equal(t, config.Duration(5*time.Second), o.subscribeClientConfig.RequestTimeout)
require.Equal(t, config.Duration(200*time.Millisecond), o.SubscribeClientConfig.SubscriptionInterval) require.Equal(t, config.Duration(200*time.Millisecond), o.subscribeClientConfig.SubscriptionInterval)
require.Equal(t, "error", o.SubscribeClientConfig.ConnectFailBehavior) require.Equal(t, "error", o.subscribeClientConfig.ConnectFailBehavior)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityPolicy) require.Equal(t, "auto", o.subscribeClientConfig.SecurityPolicy)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityMode) require.Equal(t, "auto", o.subscribeClientConfig.SecurityMode)
require.Equal(t, "/etc/telegraf/cert.pem", o.SubscribeClientConfig.Certificate) require.Equal(t, "/etc/telegraf/cert.pem", o.subscribeClientConfig.Certificate)
require.Equal(t, "/etc/telegraf/key.pem", o.SubscribeClientConfig.PrivateKey) require.Equal(t, "/etc/telegraf/key.pem", o.subscribeClientConfig.PrivateKey)
require.Equal(t, "Anonymous", o.SubscribeClientConfig.AuthMethod) require.Equal(t, "Anonymous", o.subscribeClientConfig.AuthMethod)
require.True(t, o.SubscribeClientConfig.Username.Empty()) require.True(t, o.subscribeClientConfig.Username.Empty())
require.True(t, o.SubscribeClientConfig.Password.Empty()) require.True(t, o.subscribeClientConfig.Password.Empty())
require.Equal(t, []input.NodeSettings{ require.Equal(t, []input.NodeSettings{
{ {
FieldName: "name", FieldName: "name",
@ -460,7 +460,7 @@ additional_valid_status_codes = ["0xC0"]
IdentifierType: "s", IdentifierType: "s",
Identifier: "two", Identifier: "two",
}, },
}, o.SubscribeClientConfig.RootNodes) }, o.subscribeClientConfig.RootNodes)
require.Equal(t, []input.NodeGroupSettings{ require.Equal(t, []input.NodeGroupSettings{
{ {
MetricName: "foo", MetricName: "foo",
@ -484,9 +484,9 @@ additional_valid_status_codes = ["0xC0"]
TagsSlice: [][]string{{"tag1", "override"}}, TagsSlice: [][]string{{"tag1", "override"}},
}}, }},
}, },
}, o.SubscribeClientConfig.Groups) }, o.subscribeClientConfig.Groups)
require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds) require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.subscribeClientConfig.Workarounds)
require.Equal(t, []string{"DataType"}, o.SubscribeClientConfig.OptionalFields) require.Equal(t, []string{"DataType"}, o.subscribeClientConfig.OptionalFields)
} }
func TestSubscribeClientConfigWithMonitoringParams(t *testing.T) { func TestSubscribeClientConfigWithMonitoringParams(t *testing.T) {
@ -548,11 +548,11 @@ deadband_value = 100.0
}, },
}}, }},
}, },
}, o.SubscribeClientConfig.Groups) }, o.subscribeClientConfig.Groups)
} }
func TestSubscribeClientConfigInvalidTrigger(t *testing.T) { func TestSubscribeClientConfigInvalidTrigger(t *testing.T) {
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",
@ -581,12 +581,12 @@ func TestSubscribeClientConfigInvalidTrigger(t *testing.T) {
}, },
}) })
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) _, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "trigger 'not_valid' not supported, node 'ns=3;i=1'") require.ErrorContains(t, err, "trigger 'not_valid' not supported, node 'ns=3;i=1'")
} }
func TestSubscribeClientConfigMissingTrigger(t *testing.T) { func TestSubscribeClientConfigMissingTrigger(t *testing.T) {
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",
@ -615,12 +615,12 @@ func TestSubscribeClientConfigMissingTrigger(t *testing.T) {
}, },
}) })
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) _, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "trigger '' not supported, node 'ns=3;i=1'") require.ErrorContains(t, err, "trigger '' not supported, node 'ns=3;i=1'")
} }
func TestSubscribeClientConfigInvalidDeadbandType(t *testing.T) { func TestSubscribeClientConfigInvalidDeadbandType(t *testing.T) {
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",
@ -650,12 +650,12 @@ func TestSubscribeClientConfigInvalidDeadbandType(t *testing.T) {
}, },
}) })
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) _, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "deadband_type 'not_valid' not supported, node 'ns=3;i=1'") require.ErrorContains(t, err, "deadband_type 'not_valid' not supported, node 'ns=3;i=1'")
} }
func TestSubscribeClientConfigMissingDeadbandType(t *testing.T) { func TestSubscribeClientConfigMissingDeadbandType(t *testing.T) {
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",
@ -684,12 +684,12 @@ func TestSubscribeClientConfigMissingDeadbandType(t *testing.T) {
}, },
}) })
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) _, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "deadband_type '' not supported, node 'ns=3;i=1'") require.ErrorContains(t, err, "deadband_type '' not supported, node 'ns=3;i=1'")
} }
func TestSubscribeClientConfigInvalidDeadbandValue(t *testing.T) { func TestSubscribeClientConfigInvalidDeadbandValue(t *testing.T) {
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",
@ -721,12 +721,12 @@ func TestSubscribeClientConfigInvalidDeadbandValue(t *testing.T) {
}, },
}) })
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) _, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "negative deadband_value not supported, node 'ns=3;i=1'") require.ErrorContains(t, err, "negative deadband_value not supported, node 'ns=3;i=1'")
} }
func TestSubscribeClientConfigMissingDeadbandValue(t *testing.T) { func TestSubscribeClientConfigMissingDeadbandValue(t *testing.T) {
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",
@ -756,12 +756,12 @@ func TestSubscribeClientConfigMissingDeadbandValue(t *testing.T) {
}, },
}) })
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) _, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "deadband_value was not set, node 'ns=3;i=1'") require.ErrorContains(t, err, "deadband_value was not set, node 'ns=3;i=1'")
} }
func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) { func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) {
subscribeConfig := SubscribeClientConfig{ subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{ InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{ OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840", Endpoint: "opc.tcp://localhost:4840",
@ -799,7 +799,7 @@ func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) {
}, },
}) })
subClient, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) subClient, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &ua.MonitoringParameters{ require.Equal(t, &ua.MonitoringParameters{
SamplingInterval: 50, SamplingInterval: 50,

View File

@ -16,15 +16,15 @@ import (
"github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/plugins/common/opcua/input"
) )
type SubscribeClientConfig struct { type subscribeClientConfig struct {
input.InputClientConfig input.InputClientConfig
SubscriptionInterval config.Duration `toml:"subscription_interval"` SubscriptionInterval config.Duration `toml:"subscription_interval"`
ConnectFailBehavior string `toml:"connect_fail_behavior"` ConnectFailBehavior string `toml:"connect_fail_behavior"`
} }
type SubscribeClient struct { type subscribeClient struct {
*input.OpcUAInputClient *input.OpcUAInputClient
Config SubscribeClientConfig Config subscribeClientConfig
sub *opcua.Subscription sub *opcua.Subscription
monitoredItemsReqs []*ua.MonitoredItemCreateRequest monitoredItemsReqs []*ua.MonitoredItemCreateRequest
@ -81,7 +81,7 @@ func assignConfigValuesToRequest(req *ua.MonitoredItemCreateRequest, monParams *
return nil return nil
} }
func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*SubscribeClient, error) { func (sc *subscribeClientConfig) createSubscribeClient(log telegraf.Logger) (*subscribeClient, error) {
client, err := sc.InputClientConfig.CreateInputClient(log) client, err := sc.InputClientConfig.CreateInputClient(log)
if err != nil { if err != nil {
return nil, err return nil, err
@ -92,7 +92,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
} }
processingCtx, processingCancel := context.WithCancel(context.Background()) processingCtx, processingCancel := context.WithCancel(context.Background())
subClient := &SubscribeClient{ subClient := &subscribeClient{
OpcUAInputClient: client, OpcUAInputClient: client,
Config: *sc, Config: *sc,
monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)), monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)),
@ -118,7 +118,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
return subClient, nil return subClient, nil
} }
func (o *SubscribeClient) Connect() error { func (o *subscribeClient) connect() error {
err := o.OpcUAClient.Connect(o.ctx) err := o.OpcUAClient.Connect(o.ctx)
if err != nil { if err != nil {
return err return err
@ -137,7 +137,7 @@ func (o *SubscribeClient) Connect() error {
return nil return nil
} }
func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} { func (o *subscribeClient) stop(ctx context.Context) <-chan struct{} {
o.Log.Debugf("Stopping OPC subscription...") o.Log.Debugf("Stopping OPC subscription...")
if o.State() != opcuaclient.Connected { if o.State() != opcuaclient.Connected {
return nil return nil
@ -152,8 +152,8 @@ func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} {
return closing return closing
} }
func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) { func (o *subscribeClient) startStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) {
err := o.Connect() err := o.connect()
if err != nil { if err != nil {
switch o.Config.ConnectFailBehavior { switch o.Config.ConnectFailBehavior {
case "retry": case "retry":
@ -191,7 +191,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
return o.metrics, nil return o.metrics, nil
} }
func (o *SubscribeClient) processReceivedNotifications() { func (o *subscribeClient) processReceivedNotifications() {
for { for {
select { select {
case <-o.ctx.Done(): case <-o.ctx.Done():

View File

@ -7,7 +7,7 @@ import (
"strconv" "strconv"
"strings" "strings"
ldap "github.com/go-ldap/ldap/v3" "github.com/go-ldap/ldap/v3"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/common/tls"
@ -17,23 +17,11 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type Openldap struct { var (
Host string searchBase = "cn=Monitor"
Port int searchFilter = "(|(objectClass=monitorCounterObject)(objectClass=monitorOperation)(objectClass=monitoredObject))"
SSL string `toml:"ssl" deprecated:"1.7.0;1.35.0;use 'tls' instead"` searchAttrs = []string{"monitorCounter", "monitorOpInitiated", "monitorOpCompleted", "monitoredInfo"}
TLS string `toml:"tls"` attrTranslate = map[string]string{
InsecureSkipVerify bool
SSLCA string `toml:"ssl_ca" deprecated:"1.7.0;1.35.0;use 'tls_ca' instead"`
TLSCA string `toml:"tls_ca"`
BindDn string
BindPassword string
ReverseMetricNames bool
}
var searchBase = "cn=Monitor"
var searchFilter = "(|(objectClass=monitorCounterObject)(objectClass=monitorOperation)(objectClass=monitoredObject))"
var searchAttrs = []string{"monitorCounter", "monitorOpInitiated", "monitorOpCompleted", "monitoredInfo"}
var attrTranslate = map[string]string{
"monitorCounter": "", "monitorCounter": "",
"monitoredInfo": "", "monitoredInfo": "",
"monitorOpInitiated": "_initiated", "monitorOpInitiated": "_initiated",
@ -45,28 +33,25 @@ var attrTranslate = map[string]string{
"olmMDBReadersUsed": "_mdb_readers_used", "olmMDBReadersUsed": "_mdb_readers_used",
"olmMDBEntries": "_mdb_entries", "olmMDBEntries": "_mdb_entries",
} }
)
// return an initialized Openldap type Openldap struct {
func NewOpenldap() *Openldap { Host string `toml:"host"`
return &Openldap{ Port int `toml:"port"`
Host: "localhost", SSL string `toml:"ssl" deprecated:"1.7.0;1.35.0;use 'tls' instead"`
Port: 389, TLS string `toml:"tls"`
SSL: "", InsecureSkipVerify bool `toml:"insecure_skip_verify"`
TLS: "", SSLCA string `toml:"ssl_ca" deprecated:"1.7.0;1.35.0;use 'tls_ca' instead"`
InsecureSkipVerify: false, TLSCA string `toml:"tls_ca"`
SSLCA: "", BindDn string `toml:"bind_dn"`
TLSCA: "", BindPassword string `toml:"bind_password"`
BindDn: "", ReverseMetricNames bool `toml:"reverse_metric_names"`
BindPassword: "",
ReverseMetricNames: false,
}
} }
func (*Openldap) SampleConfig() string { func (*Openldap) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// gather metrics
func (o *Openldap) Gather(acc telegraf.Accumulator) error { func (o *Openldap) Gather(acc telegraf.Accumulator) error {
if o.TLS == "" { if o.TLS == "" {
o.TLS = o.SSL o.TLS = o.SSL
@ -198,6 +183,21 @@ func dnToMetric(dn string, o *Openldap) string {
return strings.ReplaceAll(metricName, ",", "") return strings.ReplaceAll(metricName, ",", "")
} }
func init() { func newOpenldap() *Openldap {
inputs.Add("openldap", func() telegraf.Input { return NewOpenldap() }) return &Openldap{
Host: "localhost",
Port: 389,
SSL: "",
TLS: "",
InsecureSkipVerify: false,
SSLCA: "",
TLSCA: "",
BindDn: "",
BindPassword: "",
ReverseMetricNames: false,
}
}
func init() {
inputs.Add("openldap", func() telegraf.Input { return newOpenldap() })
} }

View File

@ -20,60 +20,38 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
var (
defaultBinary = "/usr/sbin/ntpctl"
defaultTimeout = config.Duration(5 * time.Second)
// Mapping of the ntpctl tag key to the index in the command output // Mapping of the ntpctl tag key to the index in the command output
var tagI = map[string]int{ tagI = map[string]int{
"stratum": 2, "stratum": 2,
} }
// Mapping of float metrics to their index in the command output // Mapping of float metrics to their index in the command output
var floatI = map[string]int{ floatI = map[string]int{
"offset": 5, "offset": 5,
"delay": 6, "delay": 6,
"jitter": 7, "jitter": 7,
} }
// Mapping of int metrics to their index in the command output // Mapping of int metrics to their index in the command output
var intI = map[string]int{ intI = map[string]int{
"wt": 0, "wt": 0,
"tl": 1, "tl": 1,
"next": 3, "next": 3,
"poll": 4, "poll": 4,
} }
)
type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error)
// Openntpd is used to store configuration values
type Openntpd struct { type Openntpd struct {
Binary string Binary string `toml:"binary"`
Timeout config.Duration Timeout config.Duration `toml:"timeout"`
UseSudo bool UseSudo bool `toml:"use_sudo"`
run runner run runner
} }
var defaultBinary = "/usr/sbin/ntpctl" type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error)
var defaultTimeout = config.Duration(5 * time.Second)
// Shell out to ntpctl and return the output
func openntpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) {
cmdArgs := []string{"-s", "peers"}
cmd := exec.Command(cmdName, cmdArgs...)
if useSudo {
cmdArgs = append([]string{cmdName}, cmdArgs...)
cmd = exec.Command("sudo", cmdArgs...)
}
var out bytes.Buffer
cmd.Stdout = &out
err := internal.RunTimeout(cmd, time.Duration(timeout))
if err != nil {
return &out, fmt.Errorf("error running ntpctl: %w", err)
}
return &out, nil
}
func (*Openntpd) SampleConfig() string { func (*Openntpd) SampleConfig() string {
return sampleConfig return sampleConfig
@ -190,6 +168,27 @@ func (n *Openntpd) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Shell out to ntpctl and return the output
func openntpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) {
cmdArgs := []string{"-s", "peers"}
cmd := exec.Command(cmdName, cmdArgs...)
if useSudo {
cmdArgs = append([]string{cmdName}, cmdArgs...)
cmd = exec.Command("sudo", cmdArgs...)
}
var out bytes.Buffer
cmd.Stdout = &out
err := internal.RunTimeout(cmd, time.Duration(timeout))
if err != nil {
return &out, fmt.Errorf("error running ntpctl: %w", err)
}
return &out, nil
}
func init() { func init() {
inputs.Add("openntpd", func() telegraf.Input { inputs.Add("openntpd", func() telegraf.Input {
return &Openntpd{ return &Openntpd{

View File

@ -10,7 +10,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func OpenntpdCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) { func openntpdCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) {
return func(string, config.Duration, bool) (*bytes.Buffer, error) { return func(string, config.Duration, bool) (*bytes.Buffer, error) {
return bytes.NewBufferString(output), nil return bytes.NewBufferString(output), nil
} }
@ -19,7 +19,7 @@ func OpenntpdCTL(output string) func(string, config.Duration, bool) (*bytes.Buff
func TestParseSimpleOutput(t *testing.T) { func TestParseSimpleOutput(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &Openntpd{ v := &Openntpd{
run: OpenntpdCTL(simpleOutput), run: openntpdCTL(simpleOutput),
} }
err := v.Gather(acc) err := v.Gather(acc)
@ -50,7 +50,7 @@ func TestParseSimpleOutput(t *testing.T) {
func TestParseSimpleOutputwithStatePrefix(t *testing.T) { func TestParseSimpleOutputwithStatePrefix(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &Openntpd{ v := &Openntpd{
run: OpenntpdCTL(simpleOutputwithStatePrefix), run: openntpdCTL(simpleOutputwithStatePrefix),
} }
err := v.Gather(acc) err := v.Gather(acc)
@ -82,7 +82,7 @@ func TestParseSimpleOutputwithStatePrefix(t *testing.T) {
func TestParseSimpleOutputInvalidPeer(t *testing.T) { func TestParseSimpleOutputInvalidPeer(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &Openntpd{ v := &Openntpd{
run: OpenntpdCTL(simpleOutputInvalidPeer), run: openntpdCTL(simpleOutputInvalidPeer),
} }
err := v.Gather(acc) err := v.Gather(acc)
@ -110,7 +110,7 @@ func TestParseSimpleOutputInvalidPeer(t *testing.T) {
func TestParseSimpleOutputServersDNSError(t *testing.T) { func TestParseSimpleOutputServersDNSError(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &Openntpd{ v := &Openntpd{
run: OpenntpdCTL(simpleOutputServersDNSError), run: openntpdCTL(simpleOutputServersDNSError),
} }
err := v.Gather(acc) err := v.Gather(acc)
@ -152,7 +152,7 @@ func TestParseSimpleOutputServersDNSError(t *testing.T) {
func TestParseSimpleOutputServerDNSError(t *testing.T) { func TestParseSimpleOutputServerDNSError(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &Openntpd{ v := &Openntpd{
run: OpenntpdCTL(simpleOutputServerDNSError), run: openntpdCTL(simpleOutputServerDNSError),
} }
err := v.Gather(acc) err := v.Gather(acc)
@ -180,7 +180,7 @@ func TestParseSimpleOutputServerDNSError(t *testing.T) {
func TestParseFullOutput(t *testing.T) { func TestParseFullOutput(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &Openntpd{ v := &Openntpd{
run: OpenntpdCTL(fullOutput), run: openntpdCTL(fullOutput),
} }
err := v.Gather(acc) err := v.Gather(acc)

View File

@ -5,9 +5,9 @@ import (
"fmt" "fmt"
) )
type BucketAggregationRequest map[string]*aggregationFunction type bucketAggregationRequest map[string]*aggregationFunction
func (b BucketAggregationRequest) AddAggregation(name, aggType, field string) error { func (b bucketAggregationRequest) addAggregation(name, aggType, field string) error {
switch aggType { switch aggType {
case "terms": case "terms":
default: default:
@ -22,11 +22,11 @@ func (b BucketAggregationRequest) AddAggregation(name, aggType, field string) er
return nil return nil
} }
func (b BucketAggregationRequest) AddNestedAggregation(name string, a AggregationRequest) { func (b bucketAggregationRequest) addNestedAggregation(name string, a aggregationRequest) {
b[name].nested = a b[name].nested = a
} }
func (b BucketAggregationRequest) BucketSize(name string, size int) error { func (b bucketAggregationRequest) bucketSize(name string, size int) error {
if size <= 0 { if size <= 0 {
return errors.New("invalid size; must be integer value > 0") return errors.New("invalid size; must be integer value > 0")
} }
@ -35,11 +35,11 @@ func (b BucketAggregationRequest) BucketSize(name string, size int) error {
return fmt.Errorf("aggregation %q not found", name) return fmt.Errorf("aggregation %q not found", name)
} }
b[name].Size(size) b[name].setSize(size)
return nil return nil
} }
func (b BucketAggregationRequest) Missing(name, missing string) { func (b bucketAggregationRequest) missing(name, missing string) {
b[name].Missing(missing) b[name].setMissing(missing)
} }

View File

@ -4,14 +4,8 @@ import (
"encoding/json" "encoding/json"
) )
type AggregationRequest interface { type aggregationRequest interface {
AddAggregation(string, string, string) error addAggregation(string, string, string) error
}
type NestedAggregation interface {
Nested(string, AggregationRequest)
Missing(string)
Size(int)
} }
type aggregationFunction struct { type aggregationFunction struct {
@ -20,7 +14,7 @@ type aggregationFunction struct {
size int size int
missing string missing string
nested AggregationRequest nested aggregationRequest
} }
func (a *aggregationFunction) MarshalJSON() ([]byte, error) { func (a *aggregationFunction) MarshalJSON() ([]byte, error) {
@ -45,11 +39,11 @@ func (a *aggregationFunction) MarshalJSON() ([]byte, error) {
return json.Marshal(agg) return json.Marshal(agg)
} }
func (a *aggregationFunction) Size(size int) { func (a *aggregationFunction) setSize(size int) {
a.size = size a.size = size
} }
func (a *aggregationFunction) Missing(missing string) { func (a *aggregationFunction) setMissing(missing string) {
a.missing = missing a.missing = missing
} }

View File

@ -2,9 +2,9 @@ package opensearch_query
import "fmt" import "fmt"
type MetricAggregationRequest map[string]*aggregationFunction type metricAggregationRequest map[string]*aggregationFunction
func (m MetricAggregationRequest) AddAggregation(name, aggType, field string) error { func (m metricAggregationRequest) addAggregation(name, aggType, field string) error {
if t := getAggregationFunctionType(aggType); t != "metric" { if t := getAggregationFunctionType(aggType); t != "metric" {
return fmt.Errorf("aggregation function %q not supported", aggType) return fmt.Errorf("aggregation function %q not supported", aggType)
} }

View File

@ -36,7 +36,7 @@ type bucketData struct {
subaggregation aggregation subaggregation aggregation
} }
func (a *aggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement string) error { func (a *aggregationResponse) getMetrics(acc telegraf.Accumulator, measurement string) error {
// Simple case (no aggregations) // Simple case (no aggregations)
if a.Aggregations == nil { if a.Aggregations == nil {
tags := make(map[string]string) tags := make(map[string]string)
@ -47,20 +47,20 @@ func (a *aggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement s
return nil return nil
} }
return a.Aggregations.GetMetrics(acc, measurement, a.Hits.TotalHits.Value, make(map[string]string)) return a.Aggregations.getMetrics(acc, measurement, a.Hits.TotalHits.Value, make(map[string]string))
} }
func (a *aggregation) GetMetrics(acc telegraf.Accumulator, measurement string, docCount int64, tags map[string]string) error { func (a *aggregation) getMetrics(acc telegraf.Accumulator, measurement string, docCount int64, tags map[string]string) error {
var err error var err error
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for name, agg := range *a { for name, agg := range *a {
if agg.IsAggregation() { if agg.isAggregation() {
for _, bucket := range agg.buckets { for _, bucket := range agg.buckets {
tt := map[string]string{name: bucket.Key} tt := map[string]string{name: bucket.Key}
for k, v := range tags { for k, v := range tags {
tt[k] = v tt[k] = v
} }
err = bucket.subaggregation.GetMetrics(acc, measurement, bucket.DocumentCount, tt) err = bucket.subaggregation.getMetrics(acc, measurement, bucket.DocumentCount, tt)
if err != nil { if err != nil {
return err return err
} }
@ -101,7 +101,7 @@ func (a *aggregateValue) UnmarshalJSON(bytes []byte) error {
return json.Unmarshal(bytes, &a.metrics) return json.Unmarshal(bytes, &a.metrics)
} }
func (a *aggregateValue) IsAggregation() bool { func (a *aggregateValue) isAggregation() bool {
return !(a.buckets == nil) return !(a.buckets == nil)
} }

View File

@ -25,7 +25,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// OpensearchQuery struct
type OpensearchQuery struct { type OpensearchQuery struct {
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
Username config.Secret `toml:"username"` Username config.Secret `toml:"username"`
@ -41,7 +40,6 @@ type OpensearchQuery struct {
osClient *opensearch.Client osClient *opensearch.Client
} }
// osAggregation struct
type osAggregation struct { type osAggregation struct {
Index string `toml:"index"` Index string `toml:"index"`
MeasurementName string `toml:"measurement_name"` MeasurementName string `toml:"measurement_name"`
@ -56,14 +54,13 @@ type osAggregation struct {
MissingTagValue string `toml:"missing_tag_value"` MissingTagValue string `toml:"missing_tag_value"`
mapMetricFields map[string]string mapMetricFields map[string]string
aggregation AggregationRequest aggregation aggregationRequest
} }
func (*OpensearchQuery) SampleConfig() string { func (*OpensearchQuery) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init the plugin.
func (o *OpensearchQuery) Init() error { func (o *OpensearchQuery) Init() error {
if o.URLs == nil { if o.URLs == nil {
return errors.New("no urls defined") return errors.New("no urls defined")
@ -89,19 +86,21 @@ func (o *OpensearchQuery) Init() error {
return nil return nil
} }
func (o *OpensearchQuery) initAggregation(agg osAggregation, i int) (err error) { func (o *OpensearchQuery) Gather(acc telegraf.Accumulator) error {
for _, metricField := range agg.MetricFields { var wg sync.WaitGroup
if _, ok := agg.mapMetricFields[metricField]; !ok {
return fmt.Errorf("metric field %q not found on index %q", metricField, agg.Index)
}
}
err = agg.buildAggregationQuery() for _, agg := range o.Aggregations {
wg.Add(1)
go func(agg osAggregation) {
defer wg.Done()
err := o.osAggregationQuery(acc, agg)
if err != nil { if err != nil {
return fmt.Errorf("error building aggregation: %w", err) acc.AddError(fmt.Errorf("opensearch query aggregation %q: %w ", agg.MeasurementName, err))
}
}(agg)
} }
o.Aggregations[i] = agg wg.Wait()
return nil return nil
} }
@ -136,22 +135,19 @@ func (o *OpensearchQuery) newClient() error {
return err return err
} }
// Gather writes the results of the queries from OpenSearch to the Accumulator. func (o *OpensearchQuery) initAggregation(agg osAggregation, i int) (err error) {
func (o *OpensearchQuery) Gather(acc telegraf.Accumulator) error { for _, metricField := range agg.MetricFields {
var wg sync.WaitGroup if _, ok := agg.mapMetricFields[metricField]; !ok {
return fmt.Errorf("metric field %q not found on index %q", metricField, agg.Index)
}
}
for _, agg := range o.Aggregations { err = agg.buildAggregationQuery()
wg.Add(1)
go func(agg osAggregation) {
defer wg.Done()
err := o.osAggregationQuery(acc, agg)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("opensearch query aggregation %q: %w ", agg.MeasurementName, err)) return fmt.Errorf("error building aggregation: %w", err)
}
}(agg)
} }
wg.Wait() o.Aggregations[i] = agg
return nil return nil
} }
@ -164,16 +160,7 @@ func (o *OpensearchQuery) osAggregationQuery(acc telegraf.Accumulator, aggregati
return err return err
} }
return searchResult.GetMetrics(acc, aggregation.MeasurementName) return searchResult.getMetrics(acc, aggregation.MeasurementName)
}
func init() {
inputs.Add("opensearch_query", func() telegraf.Input {
return &OpensearchQuery{
Timeout: config.Duration(time.Second * 5),
HealthCheckInterval: config.Duration(time.Second * 10),
}
})
} }
func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation osAggregation) (*aggregationResponse, error) { func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation osAggregation) (*aggregationResponse, error) {
@ -184,13 +171,13 @@ func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation o
filterQuery = "*" filterQuery = "*"
} }
aq := &Query{ aq := &query{
Size: 0, Size: 0,
Aggregations: aggregation.aggregation, Aggregations: aggregation.aggregation,
Query: nil, Query: nil,
} }
boolQuery := &BoolQuery{ boolQuery := &boolQuery{
FilterQueryString: filterQuery, FilterQueryString: filterQuery,
TimestampField: aggregation.DateField, TimestampField: aggregation.DateField,
TimeRangeFrom: from, TimeRangeFrom: from,
@ -231,8 +218,8 @@ func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation o
} }
func (aggregation *osAggregation) buildAggregationQuery() error { func (aggregation *osAggregation) buildAggregationQuery() error {
var agg AggregationRequest var agg aggregationRequest
agg = &MetricAggregationRequest{} agg = &metricAggregationRequest{}
// create one aggregation per metric field found & function defined for numeric fields // create one aggregation per metric field found & function defined for numeric fields
for k, v := range aggregation.mapMetricFields { for k, v := range aggregation.mapMetricFields {
@ -242,7 +229,7 @@ func (aggregation *osAggregation) buildAggregationQuery() error {
continue continue
} }
err := agg.AddAggregation(strings.ReplaceAll(k, ".", "_")+"_"+aggregation.MetricFunction, aggregation.MetricFunction, k) err := agg.addAggregation(strings.ReplaceAll(k, ".", "_")+"_"+aggregation.MetricFunction, aggregation.MetricFunction, k)
if err != nil { if err != nil {
return err return err
} }
@ -250,21 +237,21 @@ func (aggregation *osAggregation) buildAggregationQuery() error {
// create a terms aggregation per tag // create a terms aggregation per tag
for _, term := range aggregation.Tags { for _, term := range aggregation.Tags {
bucket := &BucketAggregationRequest{} bucket := &bucketAggregationRequest{}
name := strings.ReplaceAll(term, ".", "_") name := strings.ReplaceAll(term, ".", "_")
err := bucket.AddAggregation(name, "terms", term) err := bucket.addAggregation(name, "terms", term)
if err != nil { if err != nil {
return err return err
} }
err = bucket.BucketSize(name, 1000) err = bucket.bucketSize(name, 1000)
if err != nil { if err != nil {
return err return err
} }
if aggregation.IncludeMissingTag && aggregation.MissingTagValue != "" { if aggregation.IncludeMissingTag && aggregation.MissingTagValue != "" {
bucket.Missing(name, aggregation.MissingTagValue) bucket.missing(name, aggregation.MissingTagValue)
} }
bucket.AddNestedAggregation(name, agg) bucket.addNestedAggregation(name, agg)
agg = bucket agg = bucket
} }
@ -273,3 +260,12 @@ func (aggregation *osAggregation) buildAggregationQuery() error {
return nil return nil
} }
func init() {
inputs.Add("opensearch_query", func() telegraf.Input {
return &OpensearchQuery{
Timeout: config.Duration(time.Second * 5),
HealthCheckInterval: config.Duration(time.Second * 10),
}
})
}

View File

@ -12,13 +12,14 @@ import (
"time" "time"
"github.com/docker/go-connections/nat" "github.com/docker/go-connections/nat"
"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
) )
const ( const (
@ -674,18 +675,18 @@ func TestOpensearchQueryIntegration(t *testing.T) {
} }
func TestMetricAggregationMarshal(t *testing.T) { func TestMetricAggregationMarshal(t *testing.T) {
agg := &MetricAggregationRequest{} agg := &metricAggregationRequest{}
err := agg.AddAggregation("sum_taxful_total_price", "sum", "taxful_total_price") err := agg.addAggregation("sum_taxful_total_price", "sum", "taxful_total_price")
require.NoError(t, err) require.NoError(t, err)
_, err = json.Marshal(agg) _, err = json.Marshal(agg)
require.NoError(t, err) require.NoError(t, err)
bucket := &BucketAggregationRequest{} bucket := &bucketAggregationRequest{}
err = bucket.AddAggregation("terms_by_currency", "terms", "currency") err = bucket.addAggregation("terms_by_currency", "terms", "currency")
require.NoError(t, err) require.NoError(t, err)
bucket.AddNestedAggregation("terms_by_currency", agg) bucket.addNestedAggregation("terms_by_currency", agg)
_, err = json.Marshal(bucket) _, err = json.Marshal(bucket)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -5,13 +5,13 @@ import (
"time" "time"
) )
type Query struct { type query struct {
Size int `json:"size"` Size int `json:"size"`
Aggregations AggregationRequest `json:"aggregations"` Aggregations aggregationRequest `json:"aggregations"`
Query interface{} `json:"query,omitempty"` Query interface{} `json:"query,omitempty"`
} }
type BoolQuery struct { type boolQuery struct {
FilterQueryString string FilterQueryString string
TimestampField string TimestampField string
TimeRangeFrom time.Time TimeRangeFrom time.Time
@ -19,7 +19,7 @@ type BoolQuery struct {
DateFieldFormat string DateFieldFormat string
} }
func (b *BoolQuery) MarshalJSON() ([]byte, error) { func (b *boolQuery) MarshalJSON() ([]byte, error) {
// Construct range // Construct range
dateTimeRange := map[string]interface{}{ dateTimeRange := map[string]interface{}{
"from": b.TimeRangeFrom, "from": b.TimeRangeFrom,

View File

@ -21,49 +21,29 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) var (
defaultBinary = "/usr/sbin/smtpctl"
defaultTimeout = config.Duration(time.Second)
)
// Opensmtpd is used to store configuration values
type Opensmtpd struct { type Opensmtpd struct {
Binary string Binary string `toml:"binary"`
Timeout config.Duration Timeout config.Duration `toml:"timeout"`
UseSudo bool UseSudo bool `toml:"use_sudo"`
run runner run runner
} }
var defaultBinary = "/usr/sbin/smtpctl" type runner func(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error)
var defaultTimeout = config.Duration(time.Second)
// Shell out to opensmtpd_stat and return the output
func opensmtpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) {
cmdArgs := []string{"show", "stats"}
cmd := exec.Command(cmdName, cmdArgs...)
if useSudo {
cmdArgs = append([]string{cmdName}, cmdArgs...)
cmd = exec.Command("sudo", cmdArgs...)
}
var out bytes.Buffer
cmd.Stdout = &out
err := internal.RunTimeout(cmd, time.Duration(timeout))
if err != nil {
return &out, fmt.Errorf("error running smtpctl: %w", err)
}
return &out, nil
}
// Gather collects the configured stats from smtpctl and adds them to the
// Accumulator
func (*Opensmtpd) SampleConfig() string { func (*Opensmtpd) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// All the dots in stat name will replaced by underscores. Histogram statistics will not be collected.
func (s *Opensmtpd) Gather(acc telegraf.Accumulator) error { func (s *Opensmtpd) Gather(acc telegraf.Accumulator) error {
// All the dots in stat name will be replaced by underscores.
// Histogram statistics will not be collected.
// Always exclude uptime.human statistics // Always exclude uptime.human statistics
statExcluded := []string{"uptime.human"} statExcluded := []string{"uptime.human"}
filterExcluded, err := filter.Compile(statExcluded) filterExcluded, err := filter.Compile(statExcluded)
@ -108,6 +88,27 @@ func (s *Opensmtpd) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Shell out to opensmtpd_stat and return the output
func opensmtpdRunner(cmdName string, timeout config.Duration, useSudo bool) (*bytes.Buffer, error) {
cmdArgs := []string{"show", "stats"}
cmd := exec.Command(cmdName, cmdArgs...)
if useSudo {
cmdArgs = append([]string{cmdName}, cmdArgs...)
cmd = exec.Command("sudo", cmdArgs...)
}
var out bytes.Buffer
cmd.Stdout = &out
err := internal.RunTimeout(cmd, time.Duration(timeout))
if err != nil {
return &out, fmt.Errorf("error running smtpctl: %w", err)
}
return &out, nil
}
func init() { func init() {
inputs.Add("opensmtpd", func() telegraf.Input { inputs.Add("opensmtpd", func() telegraf.Input {
return &Opensmtpd{ return &Opensmtpd{

View File

@ -10,7 +10,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func SMTPCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) { func smtpCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer, error) {
return func(string, config.Duration, bool) (*bytes.Buffer, error) { return func(string, config.Duration, bool) (*bytes.Buffer, error) {
return bytes.NewBufferString(output), nil return bytes.NewBufferString(output), nil
} }
@ -19,7 +19,7 @@ func SMTPCTL(output string) func(string, config.Duration, bool) (*bytes.Buffer,
func TestFilterSomeStats(t *testing.T) { func TestFilterSomeStats(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &Opensmtpd{ v := &Opensmtpd{
run: SMTPCTL(fullOutput), run: smtpCTL(fullOutput),
} }
err := v.Gather(acc) err := v.Gather(acc)

View File

@ -57,7 +57,6 @@ var (
typeStorage = regexp.MustCompile(`_errors$|_read$|_read_req$|_write$|_write_req$`) typeStorage = regexp.MustCompile(`_errors$|_read$|_read_req$|_write$|_write_req$`)
) )
// OpenStack is the main structure associated with a collection instance.
type OpenStack struct { type OpenStack struct {
// Configuration variables // Configuration variables
IdentityEndpoint string `toml:"authentication_endpoint"` IdentityEndpoint string `toml:"authentication_endpoint"`
@ -93,19 +92,10 @@ type OpenStack struct {
services map[string]bool services map[string]bool
} }
// convertTimeFormat, to convert time format based on HumanReadableTS
func (o *OpenStack) convertTimeFormat(t time.Time) interface{} {
if o.HumanReadableTS {
return t.Format("2006-01-02T15:04:05.999999999Z07:00")
}
return t.UnixNano()
}
func (*OpenStack) SampleConfig() string { func (*OpenStack) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// initialize performs any necessary initialization functions
func (o *OpenStack) Init() error { func (o *OpenStack) Init() error {
if len(o.EnabledServices) == 0 { if len(o.EnabledServices) == 0 {
o.EnabledServices = []string{"services", "projects", "hypervisors", "flavors", "networks", "volumes"} o.EnabledServices = []string{"services", "projects", "hypervisors", "flavors", "networks", "volumes"}
@ -266,14 +256,6 @@ func (o *OpenStack) Start(telegraf.Accumulator) error {
return nil return nil
} }
func (o *OpenStack) Stop() {
if o.client != nil {
o.client.CloseIdleConnections()
}
}
// Gather gathers resources from the OpenStack API and accumulates metrics. This
// implements the Input interface.
func (o *OpenStack) Gather(acc telegraf.Accumulator) error { func (o *OpenStack) Gather(acc telegraf.Accumulator) error {
ctx := context.Background() ctx := context.Background()
callDuration := make(map[string]interface{}, len(o.services)) callDuration := make(map[string]interface{}, len(o.services))
@ -344,6 +326,12 @@ func (o *OpenStack) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (o *OpenStack) Stop() {
if o.client != nil {
o.client.CloseIdleConnections()
}
}
func (o *OpenStack) availableServicesFromAuth(provider *gophercloud.ProviderClient) (bool, error) { func (o *OpenStack) availableServicesFromAuth(provider *gophercloud.ProviderClient) (bool, error) {
authResult := provider.GetAuthResult() authResult := provider.GetAuthResult()
if authResult == nil { if authResult == nil {
@ -1067,7 +1055,14 @@ func (o *OpenStack) gatherServerDiagnostics(ctx context.Context, acc telegraf.Ac
return nil return nil
} }
// init registers a callback which creates a new OpenStack input instance. // convertTimeFormat, to convert time format based on HumanReadableTS
func (o *OpenStack) convertTimeFormat(t time.Time) interface{} {
if o.HumanReadableTS {
return t.Format("2006-01-02T15:04:05.999999999Z07:00")
}
return t.UnixNano()
}
func init() { func init() {
inputs.Add("openstack", func() telegraf.Input { inputs.Add("openstack", func() telegraf.Input {
return &OpenStack{ return &OpenStack{

View File

@ -46,10 +46,6 @@ func (*OpenTelemetry) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (*OpenTelemetry) Gather(telegraf.Accumulator) error {
return nil
}
func (o *OpenTelemetry) Init() error { func (o *OpenTelemetry) Init() error {
if o.ServiceAddress == "" { if o.ServiceAddress == "" {
o.ServiceAddress = "0.0.0.0:4317" o.ServiceAddress = "0.0.0.0:4317"
@ -123,6 +119,10 @@ func (o *OpenTelemetry) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (*OpenTelemetry) Gather(telegraf.Accumulator) error {
return nil
}
func (o *OpenTelemetry) Stop() { func (o *OpenTelemetry) Stop() {
if o.grpcServer != nil { if o.grpcServer != nil {
o.grpcServer.Stop() o.grpcServer.Stop()

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb-observability/otel2influx"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric"
@ -24,7 +25,6 @@ import (
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/protojson"
"github.com/influxdata/influxdb-observability/otel2influx"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"

View File

@ -174,7 +174,7 @@ func (n *OpenWeatherMap) gatherWeather(acc telegraf.Accumulator, city string) er
return fmt.Errorf("querying %q failed: %w", addr, err) return fmt.Errorf("querying %q failed: %w", addr, err)
} }
var e WeatherEntry var e weatherEntry
if err := json.Unmarshal(buf, &e); err != nil { if err := json.Unmarshal(buf, &e); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err) return fmt.Errorf("parsing JSON response failed: %w", err)
} }
@ -223,7 +223,7 @@ func (n *OpenWeatherMap) gatherWeatherBatch(acc telegraf.Accumulator, cities str
return fmt.Errorf("querying %q failed: %w", addr, err) return fmt.Errorf("querying %q failed: %w", addr, err)
} }
var status Status var status status
if err := json.Unmarshal(buf, &status); err != nil { if err := json.Unmarshal(buf, &status); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err) return fmt.Errorf("parsing JSON response failed: %w", err)
} }
@ -274,7 +274,7 @@ func (n *OpenWeatherMap) gatherForecast(acc telegraf.Accumulator, city string) e
return fmt.Errorf("querying %q failed: %w", addr, err) return fmt.Errorf("querying %q failed: %w", addr, err)
} }
var status Status var status status
if err := json.Unmarshal(buf, &status); err != nil { if err := json.Unmarshal(buf, &status); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err) return fmt.Errorf("parsing JSON response failed: %w", err)
} }

View File

@ -1,6 +1,6 @@
package openweathermap package openweathermap
type WeatherEntry struct { type weatherEntry struct {
Dt int64 `json:"dt"` Dt int64 `json:"dt"`
Clouds struct { Clouds struct {
All int64 `json:"all"` All int64 `json:"all"`
@ -43,21 +43,21 @@ type WeatherEntry struct {
} `json:"weather"` } `json:"weather"`
} }
func (e WeatherEntry) snow() float64 { func (e weatherEntry) snow() float64 {
if e.Snow.Snow1 > 0 { if e.Snow.Snow1 > 0 {
return e.Snow.Snow1 return e.Snow.Snow1
} }
return e.Snow.Snow3 return e.Snow.Snow3
} }
func (e WeatherEntry) rain() float64 { func (e weatherEntry) rain() float64 {
if e.Rain.Rain1 > 0 { if e.Rain.Rain1 > 0 {
return e.Rain.Rain1 return e.Rain.Rain1
} }
return e.Rain.Rain3 return e.Rain.Rain3
} }
type Status struct { type status struct {
City struct { City struct {
Coord struct { Coord struct {
Lat float64 `json:"lat"` Lat float64 `json:"lat"`
@ -67,5 +67,5 @@ type Status struct {
ID int64 `json:"id"` ID int64 `json:"id"`
Name string `json:"name"` Name string `json:"name"`
} `json:"city"` } `json:"city"`
List []WeatherEntry `json:"list"` List []weatherEntry `json:"list"`
} }