fix: Linter fixes for plugins/outputs/[a-f]* (#10124)
This commit is contained in:
parent
2a0c3059a1
commit
3dc5281632
|
|
@ -142,7 +142,7 @@ func (p *Point) setValue(v interface{}) error {
|
||||||
case float32:
|
case float32:
|
||||||
p[1] = float64(d)
|
p[1] = float64(d)
|
||||||
case float64:
|
case float64:
|
||||||
p[1] = float64(d)
|
p[1] = d
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("undeterminable type")
|
return fmt.Errorf("undeterminable type")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,13 +5,14 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/streadway/amqp"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -180,11 +181,11 @@ func (q *AMQP) SetSerializer(serializer serializers.Serializer) {
|
||||||
|
|
||||||
func (q *AMQP) Connect() error {
|
func (q *AMQP) Connect() error {
|
||||||
if q.config == nil {
|
if q.config == nil {
|
||||||
config, err := q.makeClientConfig()
|
clientConfig, err := q.makeClientConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
q.config = config
|
q.config = clientConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
@ -251,8 +252,8 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If this is the first attempt to publish and the connection is
|
// If this is the first attempt to publish and the connection is
|
||||||
// closed, try to reconnect and retry once.
|
// closed, try to reconnect and retry once.
|
||||||
|
//nolint: revive // Simplifying if-else with early return will reduce clarity
|
||||||
if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed {
|
if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed {
|
||||||
first = false
|
|
||||||
q.client = nil
|
q.client = nil
|
||||||
err := q.publish(key, body)
|
err := q.publish(key, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -268,7 +269,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 {
|
if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 {
|
||||||
q.Log.Debug("Sent MaxMessages; closing connection")
|
q.Log.Debug("Sent MaxMessages; closing connection")
|
||||||
q.client.Close()
|
if err := q.client.Close(); err != nil {
|
||||||
|
q.Log.Errorf("Closing connection failed: %v", err)
|
||||||
|
}
|
||||||
q.client = nil
|
q.client = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -315,52 +318,53 @@ func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
|
func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
|
||||||
config := &ClientConfig{
|
clientConfig := &ClientConfig{
|
||||||
exchange: q.Exchange,
|
exchange: q.Exchange,
|
||||||
exchangeType: q.ExchangeType,
|
exchangeType: q.ExchangeType,
|
||||||
exchangePassive: q.ExchangePassive,
|
exchangePassive: q.ExchangePassive,
|
||||||
encoding: q.ContentEncoding,
|
encoding: q.ContentEncoding,
|
||||||
timeout: time.Duration(q.Timeout),
|
timeout: time.Duration(q.Timeout),
|
||||||
|
log: q.Log,
|
||||||
}
|
}
|
||||||
|
|
||||||
switch q.ExchangeDurability {
|
switch q.ExchangeDurability {
|
||||||
case "transient":
|
case "transient":
|
||||||
config.exchangeDurable = false
|
clientConfig.exchangeDurable = false
|
||||||
default:
|
default:
|
||||||
config.exchangeDurable = true
|
clientConfig.exchangeDurable = true
|
||||||
}
|
}
|
||||||
|
|
||||||
config.brokers = q.Brokers
|
clientConfig.brokers = q.Brokers
|
||||||
if len(config.brokers) == 0 {
|
if len(clientConfig.brokers) == 0 {
|
||||||
config.brokers = []string{q.URL}
|
clientConfig.brokers = []string{q.URL}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch q.DeliveryMode {
|
switch q.DeliveryMode {
|
||||||
case "transient":
|
case "transient":
|
||||||
config.deliveryMode = amqp.Transient
|
clientConfig.deliveryMode = amqp.Transient
|
||||||
case "persistent":
|
case "persistent":
|
||||||
config.deliveryMode = amqp.Persistent
|
clientConfig.deliveryMode = amqp.Persistent
|
||||||
default:
|
default:
|
||||||
config.deliveryMode = amqp.Transient
|
clientConfig.deliveryMode = amqp.Transient
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(q.Headers) > 0 {
|
if len(q.Headers) > 0 {
|
||||||
config.headers = make(amqp.Table, len(q.Headers))
|
clientConfig.headers = make(amqp.Table, len(q.Headers))
|
||||||
for k, v := range q.Headers {
|
for k, v := range q.Headers {
|
||||||
config.headers[k] = v
|
clientConfig.headers[k] = v
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Copy deprecated fields into message header
|
// Copy deprecated fields into message header
|
||||||
config.headers = amqp.Table{
|
clientConfig.headers = amqp.Table{
|
||||||
"database": q.Database,
|
"database": q.Database,
|
||||||
"retention_policy": q.RetentionPolicy,
|
"retention_policy": q.RetentionPolicy,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(q.ExchangeArguments) > 0 {
|
if len(q.ExchangeArguments) > 0 {
|
||||||
config.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments))
|
clientConfig.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments))
|
||||||
for k, v := range q.ExchangeArguments {
|
for k, v := range q.ExchangeArguments {
|
||||||
config.exchangeArguments[k] = v
|
clientConfig.exchangeArguments[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -368,7 +372,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
config.tlsConfig = tlsConfig
|
clientConfig.tlsConfig = tlsConfig
|
||||||
|
|
||||||
var auth []amqp.Authentication
|
var auth []amqp.Authentication
|
||||||
if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
|
if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
|
||||||
|
|
@ -381,13 +385,13 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
config.auth = auth
|
clientConfig.auth = auth
|
||||||
|
|
||||||
return config, nil
|
return clientConfig, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect(config *ClientConfig) (Client, error) {
|
func connect(clientConfig *ClientConfig) (Client, error) {
|
||||||
return Connect(config)
|
return Connect(clientConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,13 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/streadway/amqp"
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
|
|
@ -25,6 +26,7 @@ type ClientConfig struct {
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
auth []amqp.Authentication
|
auth []amqp.Authentication
|
||||||
|
log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
type client struct {
|
type client struct {
|
||||||
|
|
@ -42,7 +44,7 @@ func Connect(config *ClientConfig) (*client, error) {
|
||||||
p := rand.Perm(len(config.brokers))
|
p := rand.Perm(len(config.brokers))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
broker := config.brokers[n]
|
broker := config.brokers[n]
|
||||||
log.Printf("D! Output [amqp] connecting to %q", broker)
|
config.log.Debugf("Connecting to %q", broker)
|
||||||
conn, err := amqp.DialConfig(
|
conn, err := amqp.DialConfig(
|
||||||
broker, amqp.Config{
|
broker, amqp.Config{
|
||||||
TLSClientConfig: config.tlsConfig,
|
TLSClientConfig: config.tlsConfig,
|
||||||
|
|
@ -53,10 +55,10 @@ func Connect(config *ClientConfig) (*client, error) {
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
client.conn = conn
|
client.conn = conn
|
||||||
log.Printf("D! Output [amqp] connected to %q", broker)
|
config.log.Debugf("Connected to %q", broker)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
log.Printf("D! Output [amqp] error connecting to %q - %s", broker, err.Error())
|
config.log.Debugf("Error connecting to %q - %v", broker, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if client.conn == nil {
|
if client.conn == nil {
|
||||||
|
|
|
||||||
|
|
@ -5,21 +5,18 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
|
||||||
|
|
||||||
"github.com/microsoft/ApplicationInsights-Go/appinsights"
|
"github.com/microsoft/ApplicationInsights-Go/appinsights"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/application_insights/mocks"
|
"github.com/influxdata/telegraf/plugins/outputs/application_insights/mocks"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestConnectFailsIfNoIkey(t *testing.T) {
|
func TestConnectFailsIfNoIkey(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
transmitter.On("Close").Return(closed)
|
transmitter.On("Close").Return(closed)
|
||||||
|
|
||||||
|
|
@ -31,12 +28,10 @@ func TestConnectFailsIfNoIkey(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Connect()
|
err := ai.Connect()
|
||||||
assert.Error(err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOutputCloseTimesOut(t *testing.T) {
|
func TestOutputCloseTimesOut(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
transmitter.On("Close").Return(unfinished)
|
transmitter.On("Close").Return(unfinished)
|
||||||
|
|
||||||
|
|
@ -47,13 +42,11 @@ func TestOutputCloseTimesOut(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Close()
|
err := ai.Close()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
transmitter.AssertCalled(t, "Close")
|
transmitter.AssertCalled(t, "Close")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCloseRemovesDiagMsgListener(t *testing.T) {
|
func TestCloseRemovesDiagMsgListener(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
transmitter.On("Close").Return(closed)
|
transmitter.On("Close").Return(closed)
|
||||||
|
|
||||||
|
|
@ -75,11 +68,11 @@ func TestCloseRemovesDiagMsgListener(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Connect()
|
err := ai.Connect()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
diagMsgSubscriber.AssertCalled(t, "Subscribe", mock.AnythingOfType("appinsights.DiagnosticsMessageHandler"))
|
diagMsgSubscriber.AssertCalled(t, "Subscribe", mock.AnythingOfType("appinsights.DiagnosticsMessageHandler"))
|
||||||
|
|
||||||
err = ai.Close()
|
err = ai.Close()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
transmitter.AssertCalled(t, "Close")
|
transmitter.AssertCalled(t, "Close")
|
||||||
diagMsgListener.AssertCalled(t, "Remove")
|
diagMsgListener.AssertCalled(t, "Remove")
|
||||||
}
|
}
|
||||||
|
|
@ -137,7 +130,6 @@ func TestAggregateMetricCreated(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
tf := func(t *testing.T) {
|
tf := func(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
|
|
@ -158,17 +150,18 @@ func TestAggregateMetricCreated(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Connect()
|
err := ai.Connect()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mSet := []telegraf.Metric{m}
|
mSet := []telegraf.Metric{m}
|
||||||
ai.Write(mSet)
|
err = ai.Write(mSet)
|
||||||
|
require.NoError(t, err)
|
||||||
transmitter.AssertNumberOfCalls(t, "Track", 1+len(tt.additionalMetricValueFields))
|
transmitter.AssertNumberOfCalls(t, "Track", 1+len(tt.additionalMetricValueFields))
|
||||||
var pAggregateTelemetry *appinsights.AggregateMetricTelemetry
|
var pAggregateTelemetry *appinsights.AggregateMetricTelemetry
|
||||||
assert.IsType(pAggregateTelemetry, transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0), "Expected last telemetry to be AggregateMetricTelemetry")
|
require.IsType(t, pAggregateTelemetry, transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0), "Expected last telemetry to be AggregateMetricTelemetry")
|
||||||
aggregateTelemetry := transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry)
|
aggregateTelemetry := transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry)
|
||||||
verifyAggregateTelemetry(assert, m, tt.valueField, tt.countField, aggregateTelemetry)
|
verifyAggregateTelemetry(t, m, tt.valueField, tt.countField, aggregateTelemetry)
|
||||||
|
|
||||||
verifyAdditionalTelemetry(assert, m, transmitter, tt.additionalMetricValueFields, metricName)
|
verifyAdditionalTelemetry(t, m, transmitter, tt.additionalMetricValueFields, metricName)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run(tt.name, tf)
|
t.Run(tt.name, tf)
|
||||||
|
|
@ -195,7 +188,6 @@ func TestSimpleMetricCreated(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
tf := func(t *testing.T) {
|
tf := func(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
|
|
@ -216,10 +208,11 @@ func TestSimpleMetricCreated(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Connect()
|
err := ai.Connect()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mSet := []telegraf.Metric{m}
|
mSet := []telegraf.Metric{m}
|
||||||
ai.Write(mSet)
|
err = ai.Write(mSet)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
expectedNumberOfCalls := len(tt.additionalMetricValueFields)
|
expectedNumberOfCalls := len(tt.additionalMetricValueFields)
|
||||||
if tt.primaryMetricValueField != "" {
|
if tt.primaryMetricValueField != "" {
|
||||||
|
|
@ -229,7 +222,7 @@ func TestSimpleMetricCreated(t *testing.T) {
|
||||||
transmitter.AssertNumberOfCalls(t, "Track", expectedNumberOfCalls)
|
transmitter.AssertNumberOfCalls(t, "Track", expectedNumberOfCalls)
|
||||||
if tt.primaryMetricValueField != "" {
|
if tt.primaryMetricValueField != "" {
|
||||||
var pMetricTelemetry *appinsights.MetricTelemetry
|
var pMetricTelemetry *appinsights.MetricTelemetry
|
||||||
assert.IsType(pMetricTelemetry, transmitter.Calls[0].Arguments.Get(0), "First created telemetry should be simple MetricTelemetry")
|
require.IsType(t, pMetricTelemetry, transmitter.Calls[0].Arguments.Get(0), "First created telemetry should be simple MetricTelemetry")
|
||||||
metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry)
|
metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry)
|
||||||
|
|
||||||
var expectedTelemetryName string
|
var expectedTelemetryName string
|
||||||
|
|
@ -238,10 +231,10 @@ func TestSimpleMetricCreated(t *testing.T) {
|
||||||
} else {
|
} else {
|
||||||
expectedTelemetryName = m.Name() + "_" + tt.primaryMetricValueField
|
expectedTelemetryName = m.Name() + "_" + tt.primaryMetricValueField
|
||||||
}
|
}
|
||||||
verifySimpleTelemetry(assert, m, tt.primaryMetricValueField, expectedTelemetryName, metricTelemetry)
|
verifySimpleTelemetry(t, m, tt.primaryMetricValueField, expectedTelemetryName, metricTelemetry)
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyAdditionalTelemetry(assert, m, transmitter, tt.additionalMetricValueFields, metricName)
|
verifyAdditionalTelemetry(t, m, transmitter, tt.additionalMetricValueFields, metricName)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run(tt.name, tf)
|
t.Run(tt.name, tf)
|
||||||
|
|
@ -265,7 +258,6 @@ func TestTagsAppliedToTelemetry(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
tf := func(t *testing.T) {
|
tf := func(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
|
|
@ -286,15 +278,16 @@ func TestTagsAppliedToTelemetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Connect()
|
err := ai.Connect()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mSet := []telegraf.Metric{m}
|
mSet := []telegraf.Metric{m}
|
||||||
ai.Write(mSet)
|
err = ai.Write(mSet)
|
||||||
|
require.NoError(t, err)
|
||||||
transmitter.AssertNumberOfCalls(t, "Track", len(tt.metricValueFields))
|
transmitter.AssertNumberOfCalls(t, "Track", len(tt.metricValueFields))
|
||||||
transmitter.AssertCalled(t, "Track", mock.AnythingOfType("*appinsights.MetricTelemetry"))
|
transmitter.AssertCalled(t, "Track", mock.AnythingOfType("*appinsights.MetricTelemetry"))
|
||||||
|
|
||||||
// Will verify that all original tags are present in telemetry.Properties map
|
// Will verify that all original tags are present in telemetry.Properties map
|
||||||
verifyAdditionalTelemetry(assert, m, transmitter, tt.metricValueFields, metricName)
|
verifyAdditionalTelemetry(t, m, transmitter, tt.metricValueFields, metricName)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run(tt.name, tf)
|
t.Run(tt.name, tf)
|
||||||
|
|
@ -302,7 +295,6 @@ func TestTagsAppliedToTelemetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestContextTagsSetOnSimpleTelemetry(t *testing.T) {
|
func TestContextTagsSetOnSimpleTelemetry(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
|
|
@ -327,19 +319,19 @@ func TestContextTagsSetOnSimpleTelemetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Connect()
|
err := ai.Connect()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mSet := []telegraf.Metric{m}
|
mSet := []telegraf.Metric{m}
|
||||||
ai.Write(mSet)
|
err = ai.Write(mSet)
|
||||||
|
require.NoError(t, err)
|
||||||
transmitter.AssertNumberOfCalls(t, "Track", 1)
|
transmitter.AssertNumberOfCalls(t, "Track", 1)
|
||||||
metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry)
|
metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry)
|
||||||
cloudTags := metricTelemetry.Tags.Cloud()
|
cloudTags := metricTelemetry.Tags.Cloud()
|
||||||
assert.Equal("atcsvc", cloudTags.GetRole())
|
require.Equal(t, "atcsvc", cloudTags.GetRole())
|
||||||
assert.Equal("bunkie17554", cloudTags.GetRoleInstance())
|
require.Equal(t, "bunkie17554", cloudTags.GetRoleInstance())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestContextTagsSetOnAggregateTelemetry(t *testing.T) {
|
func TestContextTagsSetOnAggregateTelemetry(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
transmitter := new(mocks.Transmitter)
|
transmitter := new(mocks.Transmitter)
|
||||||
|
|
@ -364,15 +356,16 @@ func TestContextTagsSetOnAggregateTelemetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ai.Connect()
|
err := ai.Connect()
|
||||||
assert.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mSet := []telegraf.Metric{m}
|
mSet := []telegraf.Metric{m}
|
||||||
ai.Write(mSet)
|
err = ai.Write(mSet)
|
||||||
|
require.NoError(t, err)
|
||||||
transmitter.AssertNumberOfCalls(t, "Track", 1)
|
transmitter.AssertNumberOfCalls(t, "Track", 1)
|
||||||
metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry)
|
metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry)
|
||||||
cloudTags := metricTelemetry.Tags.Cloud()
|
cloudTags := metricTelemetry.Tags.Cloud()
|
||||||
assert.Equal("atcsvc", cloudTags.GetRole())
|
require.Equal(t, "atcsvc", cloudTags.GetRole())
|
||||||
assert.Equal("bunkie17554", cloudTags.GetRoleInstance())
|
require.Equal(t, "bunkie17554", cloudTags.GetRoleInstance())
|
||||||
}
|
}
|
||||||
|
|
||||||
func closed() <-chan struct{} {
|
func closed() <-chan struct{} {
|
||||||
|
|
@ -387,49 +380,49 @@ func unfinished() <-chan struct{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyAggregateTelemetry(
|
func verifyAggregateTelemetry(
|
||||||
assert *assert.Assertions,
|
t *testing.T,
|
||||||
metric telegraf.Metric,
|
m telegraf.Metric,
|
||||||
valueField string,
|
valueField string,
|
||||||
countField string,
|
countField string,
|
||||||
telemetry *appinsights.AggregateMetricTelemetry,
|
telemetry *appinsights.AggregateMetricTelemetry,
|
||||||
) {
|
) {
|
||||||
verifyAggregateField := func(fieldName string, telemetryValue float64) {
|
verifyAggregateField := func(fieldName string, telemetryValue float64) {
|
||||||
metricRawFieldValue, found := metric.Fields()[fieldName]
|
metricRawFieldValue, found := m.Fields()[fieldName]
|
||||||
if !found {
|
if !found {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := toFloat64(metricRawFieldValue); err == nil {
|
if _, err := toFloat64(metricRawFieldValue); err == nil {
|
||||||
assert.EqualValues(metricRawFieldValue, telemetryValue, "Telemetry property %s does not match the metric field", fieldName)
|
require.EqualValues(t, metricRawFieldValue, telemetryValue, "Telemetry property %s does not match the metric field", fieldName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert.Equal(metric.Name(), telemetry.Name, "Telemetry name should be the same as metric name")
|
require.Equal(t, m.Name(), telemetry.Name, "Telemetry name should be the same as metric name")
|
||||||
assert.EqualValues(metric.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field")
|
require.EqualValues(t, m.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field")
|
||||||
assert.EqualValues(metric.Fields()[countField], telemetry.Count, "Telemetry sample count does not mach metric sample count field")
|
require.EqualValues(t, m.Fields()[countField], telemetry.Count, "Telemetry sample count does not mach metric sample count field")
|
||||||
verifyAggregateField("min", telemetry.Min)
|
verifyAggregateField("min", telemetry.Min)
|
||||||
verifyAggregateField("max", telemetry.Max)
|
verifyAggregateField("max", telemetry.Max)
|
||||||
verifyAggregateField("stdev", telemetry.StdDev)
|
verifyAggregateField("stdev", telemetry.StdDev)
|
||||||
verifyAggregateField("variance", telemetry.Variance)
|
verifyAggregateField("variance", telemetry.Variance)
|
||||||
assert.Equal(metric.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match")
|
require.Equal(t, m.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match")
|
||||||
assertMapContains(assert, metric.Tags(), telemetry.Properties)
|
assertMapContains(t, m.Tags(), telemetry.Properties)
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifySimpleTelemetry(
|
func verifySimpleTelemetry(
|
||||||
assert *assert.Assertions,
|
t *testing.T,
|
||||||
metric telegraf.Metric,
|
m telegraf.Metric,
|
||||||
valueField string,
|
valueField string,
|
||||||
expectedTelemetryName string,
|
expectedTelemetryName string,
|
||||||
telemetry *appinsights.MetricTelemetry,
|
telemetry *appinsights.MetricTelemetry,
|
||||||
) {
|
) {
|
||||||
assert.Equal(expectedTelemetryName, telemetry.Name, "Telemetry name is not what was expected")
|
require.Equal(t, expectedTelemetryName, telemetry.Name, "Telemetry name is not what was expected")
|
||||||
assert.EqualValues(metric.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field")
|
require.EqualValues(t, m.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field")
|
||||||
assert.Equal(metric.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match")
|
require.Equal(t, m.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match")
|
||||||
assertMapContains(assert, metric.Tags(), telemetry.Properties)
|
assertMapContains(t, m.Tags(), telemetry.Properties)
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyAdditionalTelemetry(
|
func verifyAdditionalTelemetry(
|
||||||
assert *assert.Assertions,
|
t *testing.T,
|
||||||
metric telegraf.Metric,
|
m telegraf.Metric,
|
||||||
transmitter *mocks.Transmitter,
|
transmitter *mocks.Transmitter,
|
||||||
additionalMetricValueFields []string,
|
additionalMetricValueFields []string,
|
||||||
telemetryNamePrefix string,
|
telemetryNamePrefix string,
|
||||||
|
|
@ -437,9 +430,9 @@ func verifyAdditionalTelemetry(
|
||||||
for _, fieldName := range additionalMetricValueFields {
|
for _, fieldName := range additionalMetricValueFields {
|
||||||
expectedTelemetryName := telemetryNamePrefix + "_" + fieldName
|
expectedTelemetryName := telemetryNamePrefix + "_" + fieldName
|
||||||
telemetry := findTransmittedTelemetry(transmitter, expectedTelemetryName)
|
telemetry := findTransmittedTelemetry(transmitter, expectedTelemetryName)
|
||||||
assert.NotNil(telemetry, "Expected telemetry named %s to be created, but could not find it", expectedTelemetryName)
|
require.NotNil(t, telemetry, "Expected telemetry named %s to be created, but could not find it", expectedTelemetryName)
|
||||||
if telemetry != nil {
|
if telemetry != nil {
|
||||||
verifySimpleTelemetry(assert, metric, fieldName, expectedTelemetryName, telemetry)
|
verifySimpleTelemetry(t, m, fieldName, expectedTelemetryName, telemetry)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -455,17 +448,17 @@ func findTransmittedTelemetry(transmitter *mocks.Transmitter, telemetryName stri
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertMapContains(assert *assert.Assertions, expected, actual map[string]string) {
|
func assertMapContains(t *testing.T, expected, actual map[string]string) {
|
||||||
if expected == nil && actual == nil {
|
if expected == nil && actual == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.NotNil(expected, "Maps not equal: expected is nil but actual is not")
|
require.NotNil(t, expected, "Maps not equal: expected is nil but actual is not")
|
||||||
assert.NotNil(actual, "Maps not equal: actual is nil but expected is not")
|
require.NotNil(t, actual, "Maps not equal: actual is nil but expected is not")
|
||||||
|
|
||||||
for k, v := range expected {
|
for k, v := range expected {
|
||||||
av, ok := actual[k]
|
av, ok := actual[k]
|
||||||
assert.True(ok, "Actual map does not contain a value for key '%s'", k)
|
require.True(t, ok, "Actual map does not contain a value for key '%s'", k)
|
||||||
assert.Equal(v, av, "The expected value for key '%s' is '%s' but the actual value is '%s", k, v, av)
|
require.Equal(t, v, av, "The expected value for key '%s' is '%s' but the actual value is '%s", k, v, av)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
"github.com/Azure/go-autorest/autorest"
|
"github.com/Azure/go-autorest/autorest"
|
||||||
"github.com/Azure/go-autorest/autorest/azure/auth"
|
"github.com/Azure/go-autorest/autorest/azure/auth"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
|
@ -208,7 +209,7 @@ func (a *AzureMonitor) Connect() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// vmMetadata retrieves metadata about the current Azure VM
|
// vmMetadata retrieves metadata about the current Azure VM
|
||||||
func vmInstanceMetadata(c *http.Client) (string, string, error) {
|
func vmInstanceMetadata(c *http.Client) (region string, resourceID string, err error) {
|
||||||
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
|
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", fmt.Errorf("error creating request: %v", err)
|
return "", "", fmt.Errorf("error creating request: %v", err)
|
||||||
|
|
@ -235,8 +236,8 @@ func vmInstanceMetadata(c *http.Client) (string, string, error) {
|
||||||
return "", "", err
|
return "", "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
region := metadata.Compute.Location
|
region = metadata.Compute.Location
|
||||||
resourceID := metadata.ResourceID()
|
resourceID = metadata.ResourceID()
|
||||||
|
|
||||||
return region, resourceID, nil
|
return region, resourceID, nil
|
||||||
}
|
}
|
||||||
|
|
@ -366,20 +367,20 @@ func (a *AzureMonitor) send(body []byte) error {
|
||||||
|
|
||||||
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
|
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
|
||||||
h := fnv.New64a()
|
h := fnv.New64a()
|
||||||
h.Write([]byte(m.Name()))
|
h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
for _, tag := range m.TagList() {
|
for _, tag := range m.TagList() {
|
||||||
if tag.Key == "" || tag.Value == "" {
|
if tag.Key == "" || tag.Value == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
h.Write([]byte(tag.Key))
|
h.Write([]byte(tag.Key)) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
}
|
}
|
||||||
b := make([]byte, binary.MaxVarintLen64)
|
b := make([]byte, binary.MaxVarintLen64)
|
||||||
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
|
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
|
||||||
h.Write(b[:n])
|
h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
return h.Sum64()
|
return h.Sum64()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -573,10 +574,10 @@ func hashIDWithField(id uint64, fk string) uint64 {
|
||||||
h := fnv.New64a()
|
h := fnv.New64a()
|
||||||
b := make([]byte, binary.MaxVarintLen64)
|
b := make([]byte, binary.MaxVarintLen64)
|
||||||
n := binary.PutUvarint(b, id)
|
n := binary.PutUvarint(b, id)
|
||||||
h.Write(b[:n])
|
h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
h.Write([]byte(fk))
|
h.Write([]byte(fk)) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
h.Write([]byte("\n"))
|
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
return h.Sum64()
|
return h.Sum64()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,15 @@
|
||||||
package cloud_pubsub
|
package cloud_pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/base64"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"encoding/base64"
|
|
||||||
|
|
||||||
"cloud.google.com/go/pubsub"
|
"cloud.google.com/go/pubsub"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPubSub_WriteSingle(t *testing.T) {
|
func TestPubSub_WriteSingle(t *testing.T) {
|
||||||
|
|
@ -51,8 +50,8 @@ func TestPubSub_WriteWithAttribute(t *testing.T) {
|
||||||
|
|
||||||
for _, testM := range testMetrics {
|
for _, testM := range testMetrics {
|
||||||
msg := verifyRawMetricPublished(t, testM.m, topic.published)
|
msg := verifyRawMetricPublished(t, testM.m, topic.published)
|
||||||
assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1")
|
require.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1")
|
||||||
assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2")
|
require.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,7 +73,7 @@ func TestPubSub_WriteMultiple(t *testing.T) {
|
||||||
for _, testM := range testMetrics {
|
for _, testM := range testMetrics {
|
||||||
verifyRawMetricPublished(t, testM.m, topic.published)
|
verifyRawMetricPublished(t, testM.m, topic.published)
|
||||||
}
|
}
|
||||||
assert.Equalf(t, 1, topic.getBundleCount(), "unexpected bundle count")
|
require.Equalf(t, 1, topic.getBundleCount(), "unexpected bundle count")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_WriteOverCountThreshold(t *testing.T) {
|
func TestPubSub_WriteOverCountThreshold(t *testing.T) {
|
||||||
|
|
@ -98,7 +97,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) {
|
||||||
for _, testM := range testMetrics {
|
for _, testM := range testMetrics {
|
||||||
verifyRawMetricPublished(t, testM.m, topic.published)
|
verifyRawMetricPublished(t, testM.m, topic.published)
|
||||||
}
|
}
|
||||||
assert.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count")
|
require.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_WriteOverByteThreshold(t *testing.T) {
|
func TestPubSub_WriteOverByteThreshold(t *testing.T) {
|
||||||
|
|
@ -121,7 +120,7 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) {
|
||||||
for _, testM := range testMetrics {
|
for _, testM := range testMetrics {
|
||||||
verifyRawMetricPublished(t, testM.m, topic.published)
|
verifyRawMetricPublished(t, testM.m, topic.published)
|
||||||
}
|
}
|
||||||
assert.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count")
|
require.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_WriteBase64Single(t *testing.T) {
|
func TestPubSub_WriteBase64Single(t *testing.T) {
|
||||||
|
|
@ -198,7 +197,7 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("expected published metric to have a value")
|
t.Fatalf("expected published metric to have a value")
|
||||||
}
|
}
|
||||||
assert.Equal(t, v, publishedV, "incorrect published value")
|
require.Equal(t, v, publishedV, "incorrect published value")
|
||||||
|
|
||||||
return psMsg
|
return psMsg
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -247,7 +247,7 @@ func (c *CloudWatch) WriteToCloudWatch(datums []types.MetricDatum) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Partition the MetricDatums into smaller slices of a max size so that are under the limit
|
// PartitionDatums partitions the MetricDatums into smaller slices of a max size so that are under the limit
|
||||||
// for the AWS API calls.
|
// for the AWS API calls.
|
||||||
func PartitionDatums(size int, datums []types.MetricDatum) [][]types.MetricDatum {
|
func PartitionDatums(size int, datums []types.MetricDatum) [][]types.MetricDatum {
|
||||||
numberOfPartitions := len(datums) / size
|
numberOfPartitions := len(datums) / size
|
||||||
|
|
@ -270,7 +270,7 @@ func PartitionDatums(size int, datums []types.MetricDatum) [][]types.MetricDatum
|
||||||
return partitions
|
return partitions
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a MetricDatum from telegraf.Metric. It would check if all required fields of
|
// BuildMetricDatum makes a MetricDatum from telegraf.Metric. It would check if all required fields of
|
||||||
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
|
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
|
||||||
// Otherwise, fields would still been built independently.
|
// Otherwise, fields would still been built independently.
|
||||||
func BuildMetricDatum(buildStatistic bool, highResolutionMetrics bool, point telegraf.Metric) []types.MetricDatum {
|
func BuildMetricDatum(buildStatistic bool, highResolutionMetrics bool, point telegraf.Metric) []types.MetricDatum {
|
||||||
|
|
@ -332,14 +332,14 @@ func BuildMetricDatum(buildStatistic bool, highResolutionMetrics bool, point tel
|
||||||
return datums
|
return datums
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to
|
// BuildDimensions makes a list of Dimensions by using a Point's tags. CloudWatch supports up to
|
||||||
// 10 dimensions per metric so we only keep up to the first 10 alphabetically.
|
// 10 dimensions per metric, so we only keep up to the first 10 alphabetically.
|
||||||
// This always includes the "host" tag if it exists.
|
// This always includes the "host" tag if it exists.
|
||||||
func BuildDimensions(mTags map[string]string) []types.Dimension {
|
func BuildDimensions(mTags map[string]string) []types.Dimension {
|
||||||
const MaxDimensions = 10
|
const maxDimensions = 10
|
||||||
dimensions := make([]types.Dimension, 0, MaxDimensions)
|
dimensions := make([]types.Dimension, 0, maxDimensions)
|
||||||
|
|
||||||
// This is pretty ugly but we always want to include the "host" tag if it exists.
|
// This is pretty ugly, but we always want to include the "host" tag if it exists.
|
||||||
if host, ok := mTags["host"]; ok {
|
if host, ok := mTags["host"]; ok {
|
||||||
dimensions = append(dimensions, types.Dimension{
|
dimensions = append(dimensions, types.Dimension{
|
||||||
Name: aws.String("host"),
|
Name: aws.String("host"),
|
||||||
|
|
@ -356,7 +356,7 @@ func BuildDimensions(mTags map[string]string) []types.Dimension {
|
||||||
sort.Strings(keys)
|
sort.Strings(keys)
|
||||||
|
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
if len(dimensions) >= MaxDimensions {
|
if len(dimensions) >= maxDimensions {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -392,7 +392,8 @@ func getStatisticType(name string) (sType statisticType, fieldName string) {
|
||||||
sType = statisticTypeNone
|
sType = statisticTypeNone
|
||||||
fieldName = name
|
fieldName = name
|
||||||
}
|
}
|
||||||
return
|
|
||||||
|
return sType, fieldName
|
||||||
}
|
}
|
||||||
|
|
||||||
func convert(v interface{}) (value float64, ok bool) {
|
func convert(v interface{}) (value float64, ok bool) {
|
||||||
|
|
@ -420,7 +421,7 @@ func convert(v interface{}) (value float64, ok bool) {
|
||||||
default:
|
default:
|
||||||
// Skip unsupported type.
|
// Skip unsupported type.
|
||||||
ok = false
|
ok = false
|
||||||
return
|
return value, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do CloudWatch boundary checking
|
// Do CloudWatch boundary checking
|
||||||
|
|
@ -436,7 +437,7 @@ func convert(v interface{}) (value float64, ok bool) {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return value, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
||||||
|
|
@ -2,26 +2,23 @@ package cloudwatch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test that each tag becomes one dimension
|
// Test that each tag becomes one dimension
|
||||||
func TestBuildDimensions(t *testing.T) {
|
func TestBuildDimensions(t *testing.T) {
|
||||||
const MaxDimensions = 10
|
const maxDimensions = 10
|
||||||
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
testPoint := testutil.TestMetric(1)
|
testPoint := testutil.TestMetric(1)
|
||||||
dimensions := BuildDimensions(testPoint.Tags())
|
dimensions := BuildDimensions(testPoint.Tags())
|
||||||
|
|
@ -35,26 +32,24 @@ func TestBuildDimensions(t *testing.T) {
|
||||||
|
|
||||||
sort.Strings(tagKeys)
|
sort.Strings(tagKeys)
|
||||||
|
|
||||||
if len(testPoint.Tags()) >= MaxDimensions {
|
if len(testPoint.Tags()) >= maxDimensions {
|
||||||
assert.Equal(MaxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions")
|
require.Equal(t, maxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions")
|
||||||
} else {
|
} else {
|
||||||
assert.Equal(len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags")
|
require.Equal(t, len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags")
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, key := range tagKeys {
|
for i, key := range tagKeys {
|
||||||
if i >= 10 {
|
if i >= 10 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
assert.Equal(key, *dimensions[i].Name, "Key should be equal")
|
require.Equal(t, key, *dimensions[i].Name, "Key should be equal")
|
||||||
assert.Equal(testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal")
|
require.Equal(t, testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that metrics with valid values have a MetricDatum created where as non valid do not.
|
// Test that metrics with valid values have a MetricDatum created where as non valid do not.
|
||||||
// Skips "time.Time" type as something is converting the value to string.
|
// Skips "time.Time" type as something is converting the value to string.
|
||||||
func TestBuildMetricDatums(t *testing.T) {
|
func TestBuildMetricDatums(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
zero := 0.0
|
zero := 0.0
|
||||||
validMetrics := []telegraf.Metric{
|
validMetrics := []telegraf.Metric{
|
||||||
testutil.TestMetric(1),
|
testutil.TestMetric(1),
|
||||||
|
|
@ -75,11 +70,11 @@ func TestBuildMetricDatums(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, point := range validMetrics {
|
for _, point := range validMetrics {
|
||||||
datums := BuildMetricDatum(false, false, point)
|
datums := BuildMetricDatum(false, false, point)
|
||||||
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point))
|
require.Equal(t, 1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point))
|
||||||
}
|
}
|
||||||
for _, point := range invalidMetrics {
|
for _, point := range invalidMetrics {
|
||||||
datums := BuildMetricDatum(false, false, point)
|
datums := BuildMetricDatum(false, false, point)
|
||||||
assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point))
|
require.Equal(t, 0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point))
|
||||||
}
|
}
|
||||||
|
|
||||||
statisticMetric := metric.New(
|
statisticMetric := metric.New(
|
||||||
|
|
@ -89,7 +84,7 @@ func TestBuildMetricDatums(t *testing.T) {
|
||||||
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
)
|
)
|
||||||
datums := BuildMetricDatum(true, false, statisticMetric)
|
datums := BuildMetricDatum(true, false, statisticMetric)
|
||||||
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric))
|
require.Equal(t, 1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric))
|
||||||
|
|
||||||
multiFieldsMetric := metric.New(
|
multiFieldsMetric := metric.New(
|
||||||
"test1",
|
"test1",
|
||||||
|
|
@ -98,7 +93,7 @@ func TestBuildMetricDatums(t *testing.T) {
|
||||||
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
)
|
)
|
||||||
datums = BuildMetricDatum(true, false, multiFieldsMetric)
|
datums = BuildMetricDatum(true, false, multiFieldsMetric)
|
||||||
assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric))
|
require.Equal(t, 4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric))
|
||||||
|
|
||||||
multiStatisticMetric := metric.New(
|
multiStatisticMetric := metric.New(
|
||||||
"test1",
|
"test1",
|
||||||
|
|
@ -112,24 +107,22 @@ func TestBuildMetricDatums(t *testing.T) {
|
||||||
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
)
|
)
|
||||||
datums = BuildMetricDatum(true, false, multiStatisticMetric)
|
datums = BuildMetricDatum(true, false, multiStatisticMetric)
|
||||||
assert.Equal(7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric))
|
require.Equal(t, 7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMetricDatumResolution(t *testing.T) {
|
func TestMetricDatumResolution(t *testing.T) {
|
||||||
const expectedStandardResolutionValue = int32(60)
|
const expectedStandardResolutionValue = int32(60)
|
||||||
const expectedHighResolutionValue = int32(1)
|
const expectedHighResolutionValue = int32(1)
|
||||||
|
|
||||||
assert := assert.New(t)
|
m := testutil.TestMetric(1)
|
||||||
|
|
||||||
metric := testutil.TestMetric(1)
|
standardResolutionDatum := BuildMetricDatum(false, false, m)
|
||||||
|
|
||||||
standardResolutionDatum := BuildMetricDatum(false, false, metric)
|
|
||||||
actualStandardResolutionValue := *standardResolutionDatum[0].StorageResolution
|
actualStandardResolutionValue := *standardResolutionDatum[0].StorageResolution
|
||||||
assert.Equal(expectedStandardResolutionValue, actualStandardResolutionValue)
|
require.Equal(t, expectedStandardResolutionValue, actualStandardResolutionValue)
|
||||||
|
|
||||||
highResolutionDatum := BuildMetricDatum(false, true, metric)
|
highResolutionDatum := BuildMetricDatum(false, true, m)
|
||||||
actualHighResolutionValue := *highResolutionDatum[0].StorageResolution
|
actualHighResolutionValue := *highResolutionDatum[0].StorageResolution
|
||||||
assert.Equal(expectedHighResolutionValue, actualHighResolutionValue)
|
require.Equal(t, expectedHighResolutionValue, actualHighResolutionValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) {
|
func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) {
|
||||||
|
|
@ -150,8 +143,6 @@ func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPartitionDatums(t *testing.T) {
|
func TestPartitionDatums(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
testDatum := types.MetricDatum{
|
testDatum := types.MetricDatum{
|
||||||
MetricName: aws.String("Foo"),
|
MetricName: aws.String("Foo"),
|
||||||
Value: aws.Float64(1),
|
Value: aws.Float64(1),
|
||||||
|
|
@ -162,9 +153,9 @@ func TestPartitionDatums(t *testing.T) {
|
||||||
twoDatum := []types.MetricDatum{testDatum, testDatum}
|
twoDatum := []types.MetricDatum{testDatum, testDatum}
|
||||||
threeDatum := []types.MetricDatum{testDatum, testDatum, testDatum}
|
threeDatum := []types.MetricDatum{testDatum, testDatum, testDatum}
|
||||||
|
|
||||||
assert.Equal([][]types.MetricDatum{}, PartitionDatums(2, zeroDatum))
|
require.Equal(t, [][]types.MetricDatum{}, PartitionDatums(2, zeroDatum))
|
||||||
assert.Equal([][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
||||||
assert.Equal([][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
||||||
assert.Equal([][]types.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum))
|
require.Equal(t, [][]types.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum))
|
||||||
assert.Equal([][]types.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum))
|
require.Equal(t, [][]types.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,10 +11,11 @@ import (
|
||||||
|
|
||||||
cloudwatchlogsV2 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
|
cloudwatchlogsV2 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
internalaws "github.com/influxdata/telegraf/config/aws"
|
internalaws "github.com/influxdata/telegraf/config/aws"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockCloudWatchLogs struct {
|
type mockCloudWatchLogs struct {
|
||||||
|
|
@ -57,9 +58,7 @@ func (c *mockCloudWatchLogs) PutLogEvents(_ context.Context, input *cloudwatchlo
|
||||||
sequenceToken := "arbitraryToken"
|
sequenceToken := "arbitraryToken"
|
||||||
output := &cloudwatchlogsV2.PutLogEventsOutput{NextSequenceToken: &sequenceToken}
|
output := &cloudwatchlogsV2.PutLogEventsOutput{NextSequenceToken: &sequenceToken}
|
||||||
//Saving messages
|
//Saving messages
|
||||||
for _, event := range input.LogEvents {
|
c.pushedLogEvents = append(c.pushedLogEvents, input.LogEvents...)
|
||||||
c.pushedLogEvents = append(c.pushedLogEvents, event)
|
|
||||||
}
|
|
||||||
|
|
||||||
return output, nil
|
return output, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,10 +11,11 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/jackc/pgx/v4/stdlib" //to register stdlib from PostgreSQL Driver and Toolkit
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
_ "github.com/jackc/pgx/v4/stdlib" //to register stdlib from PostgreSQL Driver and Toolkit
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const MaxInt64 = int64(^uint64(0) >> 1)
|
const MaxInt64 = int64(^uint64(0) >> 1)
|
||||||
|
|
@ -47,7 +48,7 @@ func (c *CrateDB) Connect() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if c.TableCreate {
|
} else if c.TableCreate {
|
||||||
sql := `
|
query := `
|
||||||
CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
|
CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
|
||||||
"hash_id" LONG INDEX OFF,
|
"hash_id" LONG INDEX OFF,
|
||||||
"timestamp" TIMESTAMP,
|
"timestamp" TIMESTAMP,
|
||||||
|
|
@ -60,7 +61,7 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
|
||||||
`
|
`
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout))
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if _, err := db.ExecContext(ctx, sql); err != nil {
|
if _, err := db.ExecContext(ctx, query); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -106,10 +107,10 @@ func insertSQL(table string, keyReplacement string, metrics []telegraf.Metric) (
|
||||||
}
|
}
|
||||||
rows[i] = `(` + strings.Join(escapedCols, ", ") + `)`
|
rows[i] = `(` + strings.Join(escapedCols, ", ") + `)`
|
||||||
}
|
}
|
||||||
sql := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields")
|
query := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields")
|
||||||
VALUES
|
VALUES
|
||||||
` + strings.Join(rows, " ,\n") + `;`
|
` + strings.Join(rows, " ,\n") + `;`
|
||||||
return sql, nil
|
return query, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// escapeValue returns a string version of val that is suitable for being used
|
// escapeValue returns a string version of val that is suitable for being used
|
||||||
|
|
@ -206,7 +207,7 @@ func escapeString(s string, quote string) string {
|
||||||
// [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201
|
// [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201
|
||||||
func hashID(m telegraf.Metric) int64 {
|
func hashID(m telegraf.Metric) int64 {
|
||||||
h := sha512.New()
|
h := sha512.New()
|
||||||
h.Write([]byte(m.Name()))
|
h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
tags := m.Tags()
|
tags := m.Tags()
|
||||||
tmp := make([]string, len(tags))
|
tmp := make([]string, len(tags))
|
||||||
i := 0
|
i := 0
|
||||||
|
|
@ -217,7 +218,7 @@ func hashID(m telegraf.Metric) int64 {
|
||||||
sort.Strings(tmp)
|
sort.Strings(tmp)
|
||||||
|
|
||||||
for _, s := range tmp {
|
for _, s := range tmp {
|
||||||
h.Write([]byte(s))
|
h.Write([]byte(s)) //nolint:revive // from hash.go: "It never returns an error"
|
||||||
}
|
}
|
||||||
sum := h.Sum(nil)
|
sum := h.Sum(nil)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -200,7 +200,7 @@ func (p *Point) setValue(v interface{}) error {
|
||||||
case uint64:
|
case uint64:
|
||||||
p[1] = float64(d)
|
p[1] = float64(d)
|
||||||
case float64:
|
case float64:
|
||||||
p[1] = float64(d)
|
p[1] = d
|
||||||
case bool:
|
case bool:
|
||||||
p[1] = float64(0)
|
p[1] = float64(0)
|
||||||
if d {
|
if d {
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,10 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -36,6 +36,7 @@ func fakeDatadog() *Datadog {
|
||||||
func TestUriOverride(t *testing.T) {
|
func TestUriOverride(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
//nolint:errcheck,revive // Ignore the returned error as the test will fail anyway
|
||||||
json.NewEncoder(w).Encode(`{"status":"ok"}`)
|
json.NewEncoder(w).Encode(`{"status":"ok"}`)
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
@ -51,6 +52,7 @@ func TestUriOverride(t *testing.T) {
|
||||||
func TestBadStatusCode(t *testing.T) {
|
func TestBadStatusCode(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
//nolint:errcheck,revive // Ignore the returned error as the test will fail anyway
|
||||||
json.NewEncoder(w).Encode(`{ 'errors': [
|
json.NewEncoder(w).Encode(`{ 'errors': [
|
||||||
'Something bad happened to the server.',
|
'Something bad happened to the server.',
|
||||||
'Your query made the server very sad.'
|
'Your query made the server very sad.'
|
||||||
|
|
@ -75,7 +77,7 @@ func TestAuthenticatedUrl(t *testing.T) {
|
||||||
d := fakeDatadog()
|
d := fakeDatadog()
|
||||||
|
|
||||||
authURL := d.authenticatedURL()
|
authURL := d.authenticatedURL()
|
||||||
assert.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeURL, fakeAPIKey), authURL)
|
require.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeURL, fakeAPIKey), authURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildTags(t *testing.T) {
|
func TestBuildTags(t *testing.T) {
|
||||||
|
|
@ -173,7 +175,7 @@ func TestBuildPoint(t *testing.T) {
|
||||||
nil,
|
nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
testutil.TestMetric(bool(true), "test7"),
|
testutil.TestMetric(true, "test7"),
|
||||||
Point{
|
Point{
|
||||||
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
1.0,
|
1.0,
|
||||||
|
|
@ -181,7 +183,7 @@ func TestBuildPoint(t *testing.T) {
|
||||||
nil,
|
nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
testutil.TestMetric(bool(false), "test8"),
|
testutil.TestMetric(false, "test8"),
|
||||||
Point{
|
Point{
|
||||||
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
|
||||||
0.0,
|
0.0,
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -12,12 +11,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"gopkg.in/olivere/elastic.v5"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"gopkg.in/olivere/elastic.v5"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Elasticsearch struct {
|
type Elasticsearch struct {
|
||||||
|
|
@ -36,6 +35,7 @@ type Elasticsearch struct {
|
||||||
OverwriteTemplate bool
|
OverwriteTemplate bool
|
||||||
ForceDocumentID bool `toml:"force_document_id"`
|
ForceDocumentID bool `toml:"force_document_id"`
|
||||||
MajorReleaseNumber int
|
MajorReleaseNumber int
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
Client *elastic.Client
|
Client *elastic.Client
|
||||||
|
|
@ -174,7 +174,7 @@ type templatePart struct {
|
||||||
|
|
||||||
func (a *Elasticsearch) Connect() error {
|
func (a *Elasticsearch) Connect() error {
|
||||||
if a.URLs == nil || a.IndexName == "" {
|
if a.URLs == nil || a.IndexName == "" {
|
||||||
return fmt.Errorf("Elasticsearch urls or index_name is not defined")
|
return fmt.Errorf("elasticsearch urls or index_name is not defined")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
|
||||||
|
|
@ -213,7 +213,7 @@ func (a *Elasticsearch) Connect() error {
|
||||||
clientOptions = append(clientOptions,
|
clientOptions = append(clientOptions,
|
||||||
elastic.SetHealthcheck(false),
|
elastic.SetHealthcheck(false),
|
||||||
)
|
)
|
||||||
log.Printf("D! Elasticsearch output: disabling health check")
|
a.Log.Debugf("Disabling health check")
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := elastic.NewClient(clientOptions...)
|
client, err := elastic.NewClient(clientOptions...)
|
||||||
|
|
@ -226,16 +226,16 @@ func (a *Elasticsearch) Connect() error {
|
||||||
esVersion, err := client.ElasticsearchVersion(a.URLs[0])
|
esVersion, err := client.ElasticsearchVersion(a.URLs[0])
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Elasticsearch version check failed: %s", err)
|
return fmt.Errorf("elasticsearch version check failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// quit if ES version is not supported
|
// quit if ES version is not supported
|
||||||
majorReleaseNumber, err := strconv.Atoi(strings.Split(esVersion, ".")[0])
|
majorReleaseNumber, err := strconv.Atoi(strings.Split(esVersion, ".")[0])
|
||||||
if err != nil || majorReleaseNumber < 5 {
|
if err != nil || majorReleaseNumber < 5 {
|
||||||
return fmt.Errorf("Elasticsearch version not supported: %s", esVersion)
|
return fmt.Errorf("elasticsearch version not supported: %s", esVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("I! Elasticsearch version: " + esVersion)
|
a.Log.Infof("Elasticsearch version: %q", esVersion)
|
||||||
|
|
||||||
a.Client = client
|
a.Client = client
|
||||||
a.MajorReleaseNumber = majorReleaseNumber
|
a.MajorReleaseNumber = majorReleaseNumber
|
||||||
|
|
@ -257,9 +257,9 @@ func GetPointID(m telegraf.Metric) string {
|
||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
//Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID
|
//Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID
|
||||||
|
|
||||||
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
|
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10)) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
buffer.WriteString(m.Name())
|
buffer.WriteString(m.Name()) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
buffer.WriteString(strconv.FormatUint(m.HashID(), 10))
|
buffer.WriteString(strconv.FormatUint(m.HashID(), 10)) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes()))
|
return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes()))
|
||||||
}
|
}
|
||||||
|
|
@ -305,15 +305,15 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||||
res, err := bulkRequest.Do(ctx)
|
res, err := bulkRequest.Do(ctx)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error sending bulk request to Elasticsearch: %s", err)
|
return fmt.Errorf("error sending bulk request to Elasticsearch: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Errors {
|
if res.Errors {
|
||||||
for id, err := range res.Failed() {
|
for id, err := range res.Failed() {
|
||||||
log.Printf("E! Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"])
|
a.Log.Errorf("Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"])
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
return fmt.Errorf("W! Elasticsearch failed to index %d metrics", len(res.Failed()))
|
return fmt.Errorf("elasticsearch failed to index %d metrics", len(res.Failed()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -321,13 +321,13 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
||||||
if a.TemplateName == "" {
|
if a.TemplateName == "" {
|
||||||
return fmt.Errorf("Elasticsearch template_name configuration not defined")
|
return fmt.Errorf("elasticsearch template_name configuration not defined")
|
||||||
}
|
}
|
||||||
|
|
||||||
templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx)
|
templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx)
|
||||||
|
|
||||||
if errExists != nil {
|
if errExists != nil {
|
||||||
return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
|
return fmt.Errorf("elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
|
||||||
}
|
}
|
||||||
|
|
||||||
templatePattern := a.IndexName
|
templatePattern := a.IndexName
|
||||||
|
|
@ -341,7 +341,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if templatePattern == "" {
|
if templatePattern == "" {
|
||||||
return fmt.Errorf("Template cannot be created for dynamic index names without an index prefix")
|
return fmt.Errorf("template cannot be created for dynamic index names without an index prefix")
|
||||||
}
|
}
|
||||||
|
|
||||||
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
|
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
|
||||||
|
|
@ -353,16 +353,18 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
||||||
t := template.Must(template.New("template").Parse(telegrafTemplate))
|
t := template.Must(template.New("template").Parse(telegrafTemplate))
|
||||||
var tmpl bytes.Buffer
|
var tmpl bytes.Buffer
|
||||||
|
|
||||||
t.Execute(&tmpl, tp)
|
if err := t.Execute(&tmpl, tp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx)
|
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx)
|
||||||
|
|
||||||
if errCreateTemplate != nil {
|
if errCreateTemplate != nil {
|
||||||
return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate)
|
return fmt.Errorf("elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("D! Elasticsearch template %s created or updated\n", a.TemplateName)
|
a.Log.Debugf("Template %s created or updated\n", a.TemplateName)
|
||||||
} else {
|
} else {
|
||||||
log.Println("D! Found existing Elasticsearch template. Skipping template management")
|
a.Log.Debug("Found existing Elasticsearch template. Skipping template management")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -384,7 +386,7 @@ func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
|
||||||
)
|
)
|
||||||
|
|
||||||
indexName = tagReplacer.Replace(indexName)
|
indexName = tagReplacer.Replace(indexName)
|
||||||
tagKeys = append(tagKeys, (strings.TrimSpace(tagName)))
|
tagKeys = append(tagKeys, strings.TrimSpace(tagName))
|
||||||
|
|
||||||
startTag = strings.Index(indexName, "{{")
|
startTag = strings.Index(indexName, "{{")
|
||||||
}
|
}
|
||||||
|
|
@ -413,7 +415,7 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagK
|
||||||
if value, ok := metricTags[key]; ok {
|
if value, ok := metricTags[key]; ok {
|
||||||
tagValues = append(tagValues, value)
|
tagValues = append(tagValues, value)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("D! Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue)
|
a.Log.Debugf("Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue)
|
||||||
tagValues = append(tagValues, a.DefaultTagValue)
|
tagValues = append(tagValues, a.DefaultTagValue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
|
||||||
TemplateName: "telegraf",
|
TemplateName: "telegraf",
|
||||||
OverwriteTemplate: false,
|
OverwriteTemplate: false,
|
||||||
HealthCheckInterval: config.Duration(time.Second * 10),
|
HealthCheckInterval: config.Duration(time.Second * 10),
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that we can connect to Elasticsearch
|
// Verify that we can connect to Elasticsearch
|
||||||
|
|
@ -57,6 +58,7 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
|
||||||
ManageTemplate: true,
|
ManageTemplate: true,
|
||||||
TemplateName: "",
|
TemplateName: "",
|
||||||
OverwriteTemplate: true,
|
OverwriteTemplate: true,
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := e.manageTemplate(ctx)
|
err := e.manageTemplate(ctx)
|
||||||
|
|
@ -78,6 +80,7 @@ func TestTemplateManagementIntegration(t *testing.T) {
|
||||||
ManageTemplate: true,
|
ManageTemplate: true,
|
||||||
TemplateName: "telegraf",
|
TemplateName: "telegraf",
|
||||||
OverwriteTemplate: true,
|
OverwriteTemplate: true,
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
|
||||||
|
|
@ -105,6 +108,7 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) {
|
||||||
ManageTemplate: true,
|
ManageTemplate: true,
|
||||||
TemplateName: "telegraf",
|
TemplateName: "telegraf",
|
||||||
OverwriteTemplate: true,
|
OverwriteTemplate: true,
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := e.Connect()
|
err := e.Connect()
|
||||||
|
|
@ -114,6 +118,7 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) {
|
||||||
func TestGetTagKeys(t *testing.T) {
|
func TestGetTagKeys(t *testing.T) {
|
||||||
e := &Elasticsearch{
|
e := &Elasticsearch{
|
||||||
DefaultTagValue: "none",
|
DefaultTagValue: "none",
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
|
|
@ -173,6 +178,7 @@ func TestGetTagKeys(t *testing.T) {
|
||||||
func TestGetIndexName(t *testing.T) {
|
func TestGetIndexName(t *testing.T) {
|
||||||
e := &Elasticsearch{
|
e := &Elasticsearch{
|
||||||
DefaultTagValue: "none",
|
DefaultTagValue: "none",
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
|
|
@ -286,6 +292,7 @@ func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) {
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
EnableGzip: true,
|
EnableGzip: true,
|
||||||
ManageTemplate: false,
|
ManageTemplate: false,
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := e.Connect()
|
err := e.Connect()
|
||||||
|
|
@ -319,6 +326,7 @@ func TestRequestHeaderWhenGzipIsDisabled(t *testing.T) {
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
EnableGzip: false,
|
EnableGzip: false,
|
||||||
ManageTemplate: false,
|
ManageTemplate: false,
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := e.Connect()
|
err := e.Connect()
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -22,6 +21,7 @@ const maxStderrBytes = 512
|
||||||
type Exec struct {
|
type Exec struct {
|
||||||
Command []string `toml:"command"`
|
Command []string `toml:"command"`
|
||||||
Timeout config.Duration `toml:"timeout"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
runner Runner
|
runner Runner
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
|
|
@ -42,6 +42,8 @@ var sampleConfig = `
|
||||||
`
|
`
|
||||||
|
|
||||||
func (e *Exec) Init() error {
|
func (e *Exec) Init() error {
|
||||||
|
e.runner = &CommandRunner{log: e.Log}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -77,7 +79,7 @@ func (e *Exec) Write(metrics []telegraf.Metric) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
buffer.Write(serializedMetrics)
|
buffer.Write(serializedMetrics) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
if buffer.Len() <= 0 {
|
if buffer.Len() <= 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -94,6 +96,7 @@ type Runner interface {
|
||||||
// CommandRunner runs a command with the ability to kill the process before the timeout.
|
// CommandRunner runs a command with the ability to kill the process before the timeout.
|
||||||
type CommandRunner struct {
|
type CommandRunner struct {
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
|
log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run runs the command.
|
// Run runs the command.
|
||||||
|
|
@ -114,9 +117,9 @@ func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.R
|
||||||
s = removeWindowsCarriageReturns(s)
|
s = removeWindowsCarriageReturns(s)
|
||||||
if s.Len() > 0 {
|
if s.Len() > 0 {
|
||||||
if !telegraf.Debug {
|
if !telegraf.Debug {
|
||||||
log.Printf("E! [outputs.exec] Command error: %q", c.truncate(s))
|
c.log.Errorf("Command error: %q", c.truncate(s))
|
||||||
} else {
|
} else {
|
||||||
log.Printf("D! [outputs.exec] Command error: %q", s)
|
c.log.Debugf("Command error: %q", s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -147,7 +150,7 @@ func (c *CommandRunner) truncate(buf bytes.Buffer) string {
|
||||||
buf.Truncate(i)
|
buf.Truncate(i)
|
||||||
}
|
}
|
||||||
if didTruncate {
|
if didTruncate {
|
||||||
buf.WriteString("...")
|
buf.WriteString("...") //nolint:revive // from buffer.go: "err is always nil"
|
||||||
}
|
}
|
||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
@ -155,7 +158,6 @@ func (c *CommandRunner) truncate(buf bytes.Buffer) string {
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("exec", func() telegraf.Output {
|
outputs.Add("exec", func() telegraf.Output {
|
||||||
return &Exec{
|
return &Exec{
|
||||||
runner: &CommandRunner{},
|
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -6,11 +6,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestExec(t *testing.T) {
|
func TestExec(t *testing.T) {
|
||||||
|
|
@ -59,8 +60,7 @@ func TestExec(t *testing.T) {
|
||||||
s, _ := serializers.NewInfluxSerializer()
|
s, _ := serializers.NewInfluxSerializer()
|
||||||
e.SetSerializer(s)
|
e.SetSerializer(s)
|
||||||
|
|
||||||
e.Connect()
|
require.NoError(t, e.Connect())
|
||||||
|
|
||||||
require.Equal(t, tt.err, e.Write(tt.metrics) != nil)
|
require.Equal(t, tt.err, e.Write(tt.metrics) != nil)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,13 +11,14 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC)
|
var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC)
|
||||||
|
|
@ -85,16 +86,20 @@ func runOutputConsumerProgram() {
|
||||||
parser := influx.NewStreamParser(os.Stdin)
|
parser := influx.NewStreamParser(os.Stdin)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
metric, err := parser.Next()
|
m, err := parser.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == influx.EOF {
|
if err == influx.EOF {
|
||||||
return // stream ended
|
return // stream ended
|
||||||
}
|
}
|
||||||
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
|
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
|
||||||
|
//nolint:errcheck,revive // Test will fail anyway
|
||||||
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
|
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
|
||||||
|
//nolint:revive // error code is important for this "test"
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
//nolint:errcheck,revive // Test will fail anyway
|
||||||
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
|
||||||
|
//nolint:revive // error code is important for this "test"
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -104,8 +109,10 @@ func runOutputConsumerProgram() {
|
||||||
now,
|
now,
|
||||||
)
|
)
|
||||||
|
|
||||||
if !testutil.MetricEqual(expected, metric) {
|
if !testutil.MetricEqual(expected, m) {
|
||||||
|
//nolint:errcheck,revive // Test will fail anyway
|
||||||
fmt.Fprintf(os.Stderr, "metric doesn't match expected\n")
|
fmt.Fprintf(os.Stderr, "metric doesn't match expected\n")
|
||||||
|
//nolint:revive // error code is important for this "test"
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
|
|
@ -20,7 +20,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFileExistingFile(t *testing.T) {
|
func TestFileExistingFile(t *testing.T) {
|
||||||
fh := createFile()
|
fh := createFile(t)
|
||||||
defer os.Remove(fh.Name())
|
defer os.Remove(fh.Name())
|
||||||
s, _ := serializers.NewInfluxSerializer()
|
s, _ := serializers.NewInfluxSerializer()
|
||||||
f := File{
|
f := File{
|
||||||
|
|
@ -29,20 +29,20 @@ func TestFileExistingFile(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Connect()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
validateFile(fh.Name(), expExistFile, t)
|
validateFile(t, fh.Name(), expExistFile)
|
||||||
|
|
||||||
err = f.Close()
|
err = f.Close()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFileNewFile(t *testing.T) {
|
func TestFileNewFile(t *testing.T) {
|
||||||
s, _ := serializers.NewInfluxSerializer()
|
s, _ := serializers.NewInfluxSerializer()
|
||||||
fh := tmpFile()
|
fh := tmpFile(t)
|
||||||
defer os.Remove(fh)
|
defer os.Remove(fh)
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{fh},
|
Files: []string{fh},
|
||||||
|
|
@ -50,23 +50,23 @@ func TestFileNewFile(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Connect()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
validateFile(fh, expNewFile, t)
|
validateFile(t, fh, expNewFile)
|
||||||
|
|
||||||
err = f.Close()
|
err = f.Close()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFileExistingFiles(t *testing.T) {
|
func TestFileExistingFiles(t *testing.T) {
|
||||||
fh1 := createFile()
|
fh1 := createFile(t)
|
||||||
defer os.Remove(fh1.Name())
|
defer os.Remove(fh1.Name())
|
||||||
fh2 := createFile()
|
fh2 := createFile(t)
|
||||||
defer os.Remove(fh2.Name())
|
defer os.Remove(fh2.Name())
|
||||||
fh3 := createFile()
|
fh3 := createFile(t)
|
||||||
defer os.Remove(fh3.Name())
|
defer os.Remove(fh3.Name())
|
||||||
|
|
||||||
s, _ := serializers.NewInfluxSerializer()
|
s, _ := serializers.NewInfluxSerializer()
|
||||||
|
|
@ -76,26 +76,26 @@ func TestFileExistingFiles(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Connect()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
validateFile(fh1.Name(), expExistFile, t)
|
validateFile(t, fh1.Name(), expExistFile)
|
||||||
validateFile(fh2.Name(), expExistFile, t)
|
validateFile(t, fh2.Name(), expExistFile)
|
||||||
validateFile(fh3.Name(), expExistFile, t)
|
validateFile(t, fh3.Name(), expExistFile)
|
||||||
|
|
||||||
err = f.Close()
|
err = f.Close()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFileNewFiles(t *testing.T) {
|
func TestFileNewFiles(t *testing.T) {
|
||||||
s, _ := serializers.NewInfluxSerializer()
|
s, _ := serializers.NewInfluxSerializer()
|
||||||
fh1 := tmpFile()
|
fh1 := tmpFile(t)
|
||||||
defer os.Remove(fh1)
|
defer os.Remove(fh1)
|
||||||
fh2 := tmpFile()
|
fh2 := tmpFile(t)
|
||||||
defer os.Remove(fh2)
|
defer os.Remove(fh2)
|
||||||
fh3 := tmpFile()
|
fh3 := tmpFile(t)
|
||||||
defer os.Remove(fh3)
|
defer os.Remove(fh3)
|
||||||
f := File{
|
f := File{
|
||||||
Files: []string{fh1, fh2, fh3},
|
Files: []string{fh1, fh2, fh3},
|
||||||
|
|
@ -103,23 +103,23 @@ func TestFileNewFiles(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Connect()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
validateFile(fh1, expNewFile, t)
|
validateFile(t, fh1, expNewFile)
|
||||||
validateFile(fh2, expNewFile, t)
|
validateFile(t, fh2, expNewFile)
|
||||||
validateFile(fh3, expNewFile, t)
|
validateFile(t, fh3, expNewFile)
|
||||||
|
|
||||||
err = f.Close()
|
err = f.Close()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFileBoth(t *testing.T) {
|
func TestFileBoth(t *testing.T) {
|
||||||
fh1 := createFile()
|
fh1 := createFile(t)
|
||||||
defer os.Remove(fh1.Name())
|
defer os.Remove(fh1.Name())
|
||||||
fh2 := tmpFile()
|
fh2 := tmpFile(t)
|
||||||
defer os.Remove(fh2)
|
defer os.Remove(fh2)
|
||||||
|
|
||||||
s, _ := serializers.NewInfluxSerializer()
|
s, _ := serializers.NewInfluxSerializer()
|
||||||
|
|
@ -129,16 +129,16 @@ func TestFileBoth(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Connect()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
validateFile(fh1.Name(), expExistFile, t)
|
validateFile(t, fh1.Name(), expExistFile)
|
||||||
validateFile(fh2, expNewFile, t)
|
validateFile(t, fh2, expNewFile)
|
||||||
|
|
||||||
err = f.Close()
|
err = f.Close()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFileStdout(t *testing.T) {
|
func TestFileStdout(t *testing.T) {
|
||||||
|
|
@ -154,52 +154,52 @@ func TestFileStdout(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := f.Connect()
|
err := f.Connect()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Write(testutil.MockMetrics())
|
err = f.Write(testutil.MockMetrics())
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = f.Close()
|
err = f.Close()
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
outC := make(chan string)
|
outC := make(chan string)
|
||||||
// copy the output in a separate goroutine so printing can't block indefinitely
|
// copy the output in a separate goroutine so printing can't block indefinitely
|
||||||
go func() {
|
go func() {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
io.Copy(&buf, r)
|
_, err := io.Copy(&buf, r)
|
||||||
|
require.NoError(t, err)
|
||||||
outC <- buf.String()
|
outC <- buf.String()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// back to normal state
|
// back to normal state
|
||||||
w.Close()
|
err = w.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// restoring the real stdout
|
// restoring the real stdout
|
||||||
os.Stdout = old
|
os.Stdout = old
|
||||||
out := <-outC
|
out := <-outC
|
||||||
|
|
||||||
assert.Equal(t, expNewFile, out)
|
require.Equal(t, expNewFile, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createFile() *os.File {
|
func createFile(t *testing.T) *os.File {
|
||||||
f, err := os.CreateTemp("", "")
|
f, err := os.CreateTemp("", "")
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
panic(err)
|
|
||||||
}
|
_, err = f.WriteString("cpu,cpu=cpu0 value=100 1455312810012459582\n")
|
||||||
f.WriteString("cpu,cpu=cpu0 value=100 1455312810012459582\n")
|
require.NoError(t, err)
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
func tmpFile() string {
|
func tmpFile(t *testing.T) string {
|
||||||
d, err := os.MkdirTemp("", "")
|
d, err := os.MkdirTemp("", "")
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return d + internal.RandomString(10)
|
return d + internal.RandomString(10)
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateFile(fname, expS string, t *testing.T) {
|
func validateFile(t *testing.T, fileName, expS string) {
|
||||||
buf, err := os.ReadFile(fname)
|
buf, err := os.ReadFile(fileName)
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
panic(err)
|
require.Equal(t, expS, string(buf))
|
||||||
}
|
|
||||||
assert.Equal(t, expS, string(buf))
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue