chore: Wrap long lines in plugins/outputs (#12179)
This commit is contained in:
parent
969188e9db
commit
8e9a513157
|
|
@ -122,7 +122,11 @@ func (a *ApplicationInsights) createTelemetry(metric telegraf.Metric) []appinsig
|
|||
return a.createTelemetryForUnusedFields(metric, nil)
|
||||
}
|
||||
|
||||
func (a *ApplicationInsights) createSimpleMetricTelemetry(metric telegraf.Metric, fieldName string, useFieldNameInTelemetryName bool) *appinsights.MetricTelemetry {
|
||||
func (a *ApplicationInsights) createSimpleMetricTelemetry(
|
||||
metric telegraf.Metric,
|
||||
fieldName string,
|
||||
useFieldNameInTelemetryName bool,
|
||||
) *appinsights.MetricTelemetry {
|
||||
telemetryValue, err := getFloat64TelemetryPropertyValue([]string{fieldName}, metric, nil)
|
||||
if err != nil {
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -157,7 +157,12 @@ func TestAggregateMetricCreated(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
transmitter.AssertNumberOfCalls(t, "Track", 1+len(tt.additionalMetricValueFields))
|
||||
var pAggregateTelemetry *appinsights.AggregateMetricTelemetry
|
||||
require.IsType(t, pAggregateTelemetry, transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0), "Expected last telemetry to be AggregateMetricTelemetry")
|
||||
require.IsType(
|
||||
t,
|
||||
pAggregateTelemetry,
|
||||
transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0),
|
||||
"Expected last telemetry to be AggregateMetricTelemetry",
|
||||
)
|
||||
aggregateTelemetry := transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry)
|
||||
verifyAggregateTelemetry(t, m, tt.valueField, tt.countField, aggregateTelemetry)
|
||||
|
||||
|
|
@ -183,7 +188,12 @@ func TestSimpleMetricCreated(t *testing.T) {
|
|||
{"value is of wrong type", map[string]interface{}{"value": "alpha", "count": 15}, "", []string{"count"}},
|
||||
{"count is of wrong type", map[string]interface{}{"value": 23.77, "count": 7.5}, "", []string{"count", "value"}},
|
||||
{"count is out of range", map[string]interface{}{"value": -98.45e4, "count": math.MaxUint64 - uint64(20)}, "", []string{"value", "count"}},
|
||||
{"several additional fields", map[string]interface{}{"alpha": 10, "bravo": "bravo", "charlie": 30, "delta": 40.7}, "", []string{"alpha", "charlie", "delta"}},
|
||||
{
|
||||
"several additional fields",
|
||||
map[string]interface{}{"alpha": 10, "bravo": "bravo", "charlie": 30, "delta": 40.7},
|
||||
"",
|
||||
[]string{"alpha", "charlie", "delta"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
|
|||
|
|
@ -68,7 +68,11 @@ type localClient interface {
|
|||
type ingestorFactory func(localClient, string, string) (localIngestor, error)
|
||||
|
||||
const createTableCommand = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
|
||||
const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
|
||||
const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", ` +
|
||||
`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", ` +
|
||||
`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` +
|
||||
`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", ` +
|
||||
`"Properties":{"Path":"$[\'timestamp\']"}}]'`
|
||||
const managedIngestion = "managed"
|
||||
const queuedIngestion = "queued"
|
||||
|
||||
|
|
@ -278,13 +282,15 @@ func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context,
|
|||
}
|
||||
}
|
||||
|
||||
createTableMappingstmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName))
|
||||
createTableMappingStmt :=
|
||||
kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).
|
||||
UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName))
|
||||
if adx.client != nil {
|
||||
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
|
||||
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if adx.kustoClient != nil {
|
||||
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
|
||||
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,14 +13,19 @@ import (
|
|||
|
||||
"github.com/Azure/azure-kusto-go/kusto"
|
||||
"github.com/Azure/azure-kusto-go/kusto/ingest"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
telegrafJson "github.com/influxdata/telegraf/plugins/serializers/json"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const createTableCommandExpected = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
|
||||
const createTableMappingCommandExpected = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
|
||||
const createTableMappingCommandExpected = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", ` +
|
||||
`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", ` +
|
||||
`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` +
|
||||
`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", ` +
|
||||
`"Properties":{"Path":"$[\'timestamp\']"}}]'`
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
testCases := []struct {
|
||||
|
|
@ -206,7 +211,10 @@ func TestWriteWithType(t *testing.T) {
|
|||
testutil.TestMetric(1.0, "test2"),
|
||||
testutil.TestMetric(2.0, "test3"),
|
||||
}
|
||||
expectedResultMap2 := map[string]string{"test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`, "test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`}
|
||||
expectedResultMap2 := map[string]string{
|
||||
"test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`,
|
||||
"test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`,
|
||||
}
|
||||
// List of tests
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
|
|
|||
|
|
@ -35,9 +35,21 @@ type logStreamContainer struct {
|
|||
|
||||
// Cloudwatch Logs service interface
|
||||
type cloudWatchLogs interface {
|
||||
DescribeLogGroups(context.Context, *cloudwatchlogs.DescribeLogGroupsInput, ...func(options *cloudwatchlogs.Options)) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
|
||||
DescribeLogStreams(context.Context, *cloudwatchlogs.DescribeLogStreamsInput, ...func(options *cloudwatchlogs.Options)) (*cloudwatchlogs.DescribeLogStreamsOutput, error)
|
||||
CreateLogStream(context.Context, *cloudwatchlogs.CreateLogStreamInput, ...func(options *cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
||||
DescribeLogGroups(
|
||||
context.Context,
|
||||
*cloudwatchlogs.DescribeLogGroupsInput,
|
||||
...func(options *cloudwatchlogs.Options),
|
||||
) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
|
||||
DescribeLogStreams(
|
||||
context.Context,
|
||||
*cloudwatchlogs.DescribeLogStreamsInput,
|
||||
...func(options *cloudwatchlogs.Options),
|
||||
) (*cloudwatchlogs.DescribeLogStreamsOutput, error)
|
||||
CreateLogStream(
|
||||
context.Context,
|
||||
*cloudwatchlogs.CreateLogStreamInput,
|
||||
...func(options *cloudwatchlogs.Options),
|
||||
) (*cloudwatchlogs.CreateLogStreamOutput, error)
|
||||
PutLogEvents(context.Context, *cloudwatchlogs.PutLogEventsInput, ...func(options *cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error)
|
||||
}
|
||||
|
||||
|
|
@ -279,7 +291,11 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
|||
//Check if message size is not fit to batch
|
||||
if len(logData) > maxLogMessageLength {
|
||||
metricStr := fmt.Sprintf("%v", m)
|
||||
c.Log.Errorf("Processing metric '%s...', message is too large to fit to aws max log message size: %d (bytes) !", metricStr[0:maxLogMessageLength/1000], maxLogMessageLength)
|
||||
c.Log.Errorf(
|
||||
"Processing metric '%s...', message is too large to fit to aws max log message size: %d (bytes) !",
|
||||
metricStr[0:maxLogMessageLength/1000],
|
||||
maxLogMessageLength,
|
||||
)
|
||||
continue
|
||||
}
|
||||
//Batching log messages
|
||||
|
|
|
|||
|
|
@ -28,11 +28,19 @@ func (c *mockCloudWatchLogs) Init(lsName string) {
|
|||
c.pushedLogEvents = make([]types.InputLogEvent, 0)
|
||||
}
|
||||
|
||||
func (c *mockCloudWatchLogs) DescribeLogGroups(context.Context, *cloudwatchlogsV2.DescribeLogGroupsInput, ...func(options *cloudwatchlogsV2.Options)) (*cloudwatchlogsV2.DescribeLogGroupsOutput, error) {
|
||||
func (c *mockCloudWatchLogs) DescribeLogGroups(
|
||||
context.Context,
|
||||
*cloudwatchlogsV2.DescribeLogGroupsInput,
|
||||
...func(options *cloudwatchlogsV2.Options),
|
||||
) (*cloudwatchlogsV2.DescribeLogGroupsOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *mockCloudWatchLogs) DescribeLogStreams(context.Context, *cloudwatchlogsV2.DescribeLogStreamsInput, ...func(options *cloudwatchlogsV2.Options)) (*cloudwatchlogsV2.DescribeLogStreamsOutput, error) {
|
||||
func (c *mockCloudWatchLogs) DescribeLogStreams(
|
||||
context.Context,
|
||||
*cloudwatchlogsV2.DescribeLogStreamsInput,
|
||||
...func(options *cloudwatchlogsV2.Options),
|
||||
) (*cloudwatchlogsV2.DescribeLogStreamsOutput, error) {
|
||||
arn := "arn"
|
||||
creationTime := time.Now().Unix()
|
||||
sequenceToken := "arbitraryToken"
|
||||
|
|
@ -51,10 +59,20 @@ func (c *mockCloudWatchLogs) DescribeLogStreams(context.Context, *cloudwatchlogs
|
|||
}
|
||||
return output, nil
|
||||
}
|
||||
func (c *mockCloudWatchLogs) CreateLogStream(context.Context, *cloudwatchlogsV2.CreateLogStreamInput, ...func(options *cloudwatchlogsV2.Options)) (*cloudwatchlogsV2.CreateLogStreamOutput, error) {
|
||||
|
||||
func (c *mockCloudWatchLogs) CreateLogStream(
|
||||
context.Context,
|
||||
*cloudwatchlogsV2.CreateLogStreamInput,
|
||||
...func(options *cloudwatchlogsV2.Options),
|
||||
) (*cloudwatchlogsV2.CreateLogStreamOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (c *mockCloudWatchLogs) PutLogEvents(_ context.Context, input *cloudwatchlogsV2.PutLogEventsInput, _ ...func(options *cloudwatchlogsV2.Options)) (*cloudwatchlogsV2.PutLogEventsOutput, error) {
|
||||
|
||||
func (c *mockCloudWatchLogs) PutLogEvents(
|
||||
_ context.Context,
|
||||
input *cloudwatchlogsV2.PutLogEventsInput,
|
||||
_ ...func(options *cloudwatchlogsV2.Options),
|
||||
) (*cloudwatchlogsV2.PutLogEventsOutput, error) {
|
||||
sequenceToken := "arbitraryToken"
|
||||
output := &cloudwatchlogsV2.PutLogEventsOutput{NextSequenceToken: &sequenceToken}
|
||||
//Saving messages
|
||||
|
|
|
|||
|
|
@ -323,7 +323,13 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
if res.Errors {
|
||||
for id, err := range res.Failed() {
|
||||
a.Log.Errorf("Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"])
|
||||
a.Log.Errorf(
|
||||
"Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s",
|
||||
id,
|
||||
err.Error.Reason,
|
||||
err.Error.CausedBy["reason"],
|
||||
err.Error.CausedBy["type"],
|
||||
)
|
||||
break
|
||||
}
|
||||
return fmt.Errorf("elasticsearch failed to index %d metrics", len(res.Failed()))
|
||||
|
|
|
|||
|
|
@ -128,7 +128,11 @@ func (g *Graphite) checkEOF(conn net.Conn) error {
|
|||
b := make([]byte, 1024)
|
||||
|
||||
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
|
||||
g.Log.Debugf("Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly", err, conn.RemoteAddr().String())
|
||||
g.Log.Debugf(
|
||||
"Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly",
|
||||
err,
|
||||
conn.RemoteAddr().String(),
|
||||
)
|
||||
err = conn.Close()
|
||||
g.Log.Debugf("Failed to close the connection: %v", err)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -11,11 +11,10 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
internalaws "github.com/influxdata/telegraf/plugins/common/aws"
|
||||
|
|
@ -539,13 +538,35 @@ func TestOAuthAuthorizationCodeGrant(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
tmpTokenURI := u.String() + "/token"
|
||||
data := []byte(fmt.Sprintf("{\n \"type\": \"service_account\",\n \"project_id\": \"my-project\",\n \"private_key_id\": \"223423436436453645363456\",\n \"private_key\": \"-----BEGIN PRIVATE KEY-----\\nMIICXAIBAAKBgQDX7Plvu0MJtA9TrusYtQnAogsdiYJZd9wfFIjH5FxE3SWJ4KAIE+yRWRqcqX8XnpieQLaNsfXhDPWLkWngTDydk4NO/jlAQk0e6+9+NeiZ2ViIHmtXERb9CyiiWUmo+YCd69lhzSEIMK9EPBSDHQTgQMtEfGak03G5rx3MCakE1QIDAQABAoGAOjRU4Lt3zKvO3d3u3ZAfet+zY1jn3DolCfO9EzUJcj6ymcIFIWhNgrikJcrCyZkkxrPnAbcQ8oNNxTuDcMTcKZbnyUnlQj5NtVuty5Q+zgf3/Q2pRhaE+TwrpOJ+ETtVp9R/PrPN2NC5wPo289fPNWFYkd4DPbdWZp5AJHz1XYECQQD3kKpinJxMYp9FQ1Qj1OkxGln0KPgdqRYjjW/rXI4/hUodfg+xXWHPFSGj3AgEjQIvuengbOAeH3qowF1uxVTlAkEA30hXM3EbboMCDQzNRNkkV9EiZ0MZXhj1aIGl+sQZOmOeFdcdjGkDdsA42nmaYqXCD9KAvc+S/tGJaa0Qg0VhMQJAb2+TAqh0Qn3yK39PFIH2JcAy1ZDLfq5p5L75rfwPm9AnuHbSIYhjSo+8gMG+ai3+2fTZrcfUajrJP8S3SfFRcQJBANQQPOHatxcKzlPeqMaPBXlyY553mAxK4CnVmPLGdL+EBYzwtlu5EVUj09uMSxkOHXYxk5yzHQVvtXbsrBZBOsECQBJLlkMjJmXrIIdLPmHQWL3bm9MMg1PqzupSEwz6cyrGuIIm/X91pDyxCHaKYWp38FXBkYAgohI8ow5/sgRvU5w=\\n-----END PRIVATE KEY-----\\n\",\n \"client_email\": \"test-service-account-email@example.iam.gserviceaccount.com\",\n \"client_id\": \"110300009813738675309\",\n \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\n \"token_uri\": \"%s\",\n \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\n \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/test-service-account-email@example.iam.gserviceaccount.com\"\n}", tmpTokenURI))
|
||||
data := []byte(
|
||||
fmt.Sprintf(
|
||||
"{\n \"type\": \"service_account\",\n \"project_id\": \"my-project\",\n "+
|
||||
"\"private_key_id\": \"223423436436453645363456\",\n \"private_key\": "+
|
||||
"\"-----BEGIN PRIVATE KEY-----\\nMIICXAIBAAKBgQDX7Plvu0MJtA9TrusYtQnAogsdiYJZd9wfFIjH5FxE3SWJ4KAIE+yRWRqcqX8XnpieQLaNsfXhDPWLkWngTDydk4NO/"+
|
||||
"jlAQk0e6+9+NeiZ2ViIHmtXERb9CyiiWUmo+YCd69lhzSEIMK9EPBSDHQTgQMtEfGak03G5rx3MCakE1QIDAQABAoGAOjRU4Lt3zKvO3d3u3ZAfet+zY1jn3DolCfO9EzUJcj6ymc"+
|
||||
"IFIWhNgrikJcrCyZkkxrPnAbcQ8oNNxTuDcMTcKZbnyUnlQj5NtVuty5Q+zgf3/Q2pRhaE+TwrpOJ+ETtVp9R/PrPN2NC5wPo289fPNWFYkd4DPbdWZp5AJHz1XYECQQD3kKpinJx"+
|
||||
"MYp9FQ1Qj1OkxGln0KPgdqRYjjW/rXI4/hUodfg+xXWHPFSGj3AgEjQIvuengbOAeH3qowF1uxVTlAkEA30hXM3EbboMCDQzNRNkkV9EiZ0MZXhj1aIGl+sQZOmOeFdcdjGkDdsA4"+
|
||||
"2nmaYqXCD9KAvc+S/tGJaa0Qg0VhMQJAb2+TAqh0Qn3yK39PFIH2JcAy1ZDLfq5p5L75rfwPm9AnuHbSIYhjSo+8gMG+ai3+2fTZrcfUajrJP8S3SfFRcQJBANQQPOHatxcKzlPeq"+
|
||||
"MaPBXlyY553mAxK4CnVmPLGdL+EBYzwtlu5EVUj09uMSxkOHXYxk5yzHQVvtXbsrBZBOsECQBJLlkMjJmXrIIdLPmHQWL3bm9MMg1PqzupSEwz6cyrGuIIm/X91pDyxCHaKYWp38F"+
|
||||
"XBkYAgohI8ow5/sgRvU5w=\\n-----END PRIVATE KEY-----\\n\",\n "+
|
||||
"\"client_email\": \"test-service-account-email@example.iam.gserviceaccount.com\",\n \"client_id\": \"110300009813738675309\",\n "+
|
||||
"\"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\n \"token_uri\": \"%s\",\n "+
|
||||
"\"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\n "+
|
||||
"\"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/test-service-account-email@example.iam.gserviceaccount.com\"\n}",
|
||||
tmpTokenURI,
|
||||
),
|
||||
)
|
||||
_, err = tmpFile.Write(data)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, tmpFile.Close())
|
||||
|
||||
const token = "eyJhbGciOiJSUzI1NiIsImtpZCI6Ijg2NzUzMDliMjJiMDFiZTU2YzIxM2M5ODU0MGFiNTYzYmZmNWE1OGMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwOi8vMTI3LjAuMC4xOjU4MDI1LyIsImF6cCI6InRlc3Qtc2VydmljZS1hY2NvdW50LWVtYWlsQGV4YW1wbGUuY29tIiwiZW1haWwiOiJ0ZXN0LXNlcnZpY2UtYWNjb3VudC1lbWFpbEBleGFtcGxlLmNvbSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjk0NjY4NDgwMCwiaWF0Ijo5NDY2ODEyMDAsImlzcyI6Imh0dHBzOi8vYWNjb3VudHMudGVzdC5jb20iLCJzdWIiOiIxMTAzMDAwMDk4MTM3Mzg2NzUzMDkifQ.qi2LsXP2o6nl-rbYKUlHAgTBY0QoU7Nhty5NGR4GMdc8OoGEPW-vlD0WBSaKSr11vyFcIO4ftFDWXElo9Ut-AIQPKVxinsjHIU2-LoIATgI1kyifFLyU_pBecwcI4CIXEcDK5wEkfonWFSkyDZHBeZFKbJXlQXtxj0OHvQ-DEEepXLuKY6v3s4U6GyD9_ppYUy6gzDZPYUbfPfgxCj_Jbv6qkLU0DiZ7F5-do6X6n-qkpgCRLTGHcY__rn8oe8_pSimsyJEeY49ZQ5lj4mXkVCwgL9bvL1_eW1p6sgbHaBnPKVPbM7S1_cBmzgSonm__qWyZUxfDgNdigtNsvzBQTg"
|
||||
const token = "eyJhbGciOiJSUzI1NiIsImtpZCI6Ijg2NzUzMDliMjJiMDFiZTU2YzIxM2M5ODU0MGFiNTYzYmZmNWE1OGMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwOi8vMTI3LjAuMC4x" +
|
||||
"OjU4MDI1LyIsImF6cCI6InRlc3Qtc2VydmljZS1hY2NvdW50LWVtYWlsQGV4YW1wbGUuY29tIiwiZW1haWwiOiJ0ZXN0LXNlcnZpY2UtYWNjb3VudC1lbWFpbEBleGFtcGxlLmNvbSIsImVtY" +
|
||||
"WlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjk0NjY4NDgwMCwiaWF0Ijo5NDY2ODEyMDAsImlzcyI6Imh0dHBzOi8vYWNjb3VudHMudGVzdC5jb20iLCJzdWIiOiIxMTAzMDAwMDk4MTM3Mzg2Nz" +
|
||||
"UzMDkifQ.qi2LsXP2o6nl-rbYKUlHAgTBY0QoU7Nhty5NGR4GMdc8OoGEPW-vlD0WBSaKSr11vyFcIO4ftFDWXElo9Ut-AIQPKVxinsjHIU2-LoIATgI1kyifFLyU_pBecwcI4CIXEcDK5wEk" +
|
||||
"fonWFSkyDZHBeZFKbJXlQXtxj0OHvQ-DEEepXLuKY6v3s4U6GyD9_ppYUy6gzDZPYUbfPfgxCj_Jbv6qkLU0DiZ7F5-do6X6n-qkpgCRLTGHcY__rn8oe8_pSimsyJEeY49ZQ5lj4mXkVCwgL" +
|
||||
"9bvL1_eW1p6sgbHaBnPKVPbM7S1_cBmzgSonm__qWyZUxfDgNdigtNsvzBQTg"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
|
|||
|
|
@ -245,7 +245,10 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
|||
return nil
|
||||
}
|
||||
if prodErr.Err == sarama.ErrInvalidTimestamp {
|
||||
k.Log.Error("The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch")
|
||||
k.Log.Error(
|
||||
"The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; " +
|
||||
"dropping batch",
|
||||
)
|
||||
return nil
|
||||
}
|
||||
return prodErr //nolint:staticcheck // Return first error encountered
|
||||
|
|
|
|||
|
|
@ -5,12 +5,12 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/newrelic/newrelic-telemetry-sdk-go/telemetry"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/newrelic/newrelic-telemetry-sdk-go/telemetry"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
|
|
@ -24,7 +24,7 @@ func TestBasic(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
err = nr.Write(testutil.MockMetrics())
|
||||
assert.Contains(t, err.Error(), "unable to harvest metrics")
|
||||
require.Contains(t, err.Error(), "unable to harvest metrics")
|
||||
}
|
||||
|
||||
func TestNewRelic_Write(t *testing.T) {
|
||||
|
|
@ -85,8 +85,9 @@ func TestNewRelic_Write(t *testing.T) {
|
|||
metrics: []telegraf.Metric{
|
||||
testutil.TestMetric(math.MaxFloat64, "test_maxfloat64"),
|
||||
},
|
||||
wantErr: false,
|
||||
auditMessage: `"metrics":[{"name":"test_maxfloat64.value","type":"gauge","value":1.7976931348623157e+308,"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`,
|
||||
wantErr: false,
|
||||
auditMessage: `"metrics":[{"name":"test_maxfloat64.value","type":"gauge","value":1.7976931348623157e+308,` +
|
||||
`"timestamp":1257894000000,"attributes":{"tag1":"value1"}}]`,
|
||||
},
|
||||
{
|
||||
name: "Test: Test NAN ",
|
||||
|
|
@ -112,11 +113,11 @@ func TestNewRelic_Write(t *testing.T) {
|
|||
}
|
||||
})
|
||||
err := nr.Write(tt.metrics)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
if auditLog["data"] != nil {
|
||||
assert.Contains(t, auditLog["data"], tt.auditMessage)
|
||||
require.Contains(t, auditLog["data"], tt.auditMessage)
|
||||
} else {
|
||||
assert.Contains(t, "", tt.auditMessage)
|
||||
require.Contains(t, "", tt.auditMessage)
|
||||
}
|
||||
|
||||
if (err != nil) != tt.wantErr {
|
||||
|
|
|
|||
|
|
@ -87,7 +87,9 @@ func newPostgresql() *Postgresql {
|
|||
_ = p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`))
|
||||
_ = p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
|
||||
_ = p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`))
|
||||
_ = p.TagTableAddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
|
||||
_ = p.TagTableAddColumnTemplates[0].UnmarshalText(
|
||||
[]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`),
|
||||
)
|
||||
|
||||
return p
|
||||
}
|
||||
|
|
@ -445,7 +447,8 @@ func (p *Postgresql) writeTagTable(ctx context.Context, db dbh, tableSource *Tab
|
|||
return fmt.Errorf("copying into tags temp table: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(ctx, fmt.Sprintf("INSERT INTO %s SELECT * FROM %s ORDER BY tag_id ON CONFLICT (tag_id) DO NOTHING", ident.Sanitize(), identTemp.Sanitize())); err != nil {
|
||||
insert := fmt.Sprintf("INSERT INTO %s SELECT * FROM %s ORDER BY tag_id ON CONFLICT (tag_id) DO NOTHING", ident.Sanitize(), identTemp.Sanitize())
|
||||
if _, err := tx.Exec(ctx, insert); err != nil {
|
||||
return fmt.Errorf("inserting into tags table: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -72,7 +72,8 @@ A simple example for usage with TimescaleDB would be:
|
|||
|
||||
...where the defaults for the other templates would be automatically applied.
|
||||
|
||||
A very complex example for versions of TimescaleDB which don't support adding columns to compressed hypertables (v<2.1.0), using views and unions to emulate the functionality, would be:
|
||||
A very complex example for versions of TimescaleDB which don't support adding columns to compressed hypertables (v<2.1.0),
|
||||
using views and unions to emulate the functionality, would be:
|
||||
|
||||
[outputs.postgresql]
|
||||
schema = "telegraf"
|
||||
|
|
|
|||
|
|
@ -17,7 +17,8 @@ import (
|
|||
//go:embed sample.conf
|
||||
var sampleConfig string
|
||||
|
||||
const deprecationMsg = "Error: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion."
|
||||
const deprecationMsg = "Error: this Riemann output plugin will be deprecated in a future release, " +
|
||||
"see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion."
|
||||
|
||||
type Riemann struct {
|
||||
URL string `toml:"url"`
|
||||
|
|
|
|||
|
|
@ -440,7 +440,17 @@ func TestGetStackdriverLabels(t *testing.T) {
|
|||
{Key: "device", Value: "local"},
|
||||
{Key: "reserve", Value: "publication"},
|
||||
{Key: "xpfqacltlmpguimhtjlou2qlmf9uqqwk3teajwlwqkoxtsppbnjksaxvzc1aa973pho9m96gfnl5op8ku7sv93rexyx42qe3zty12ityv", Value: "keyquota"},
|
||||
{Key: "valuequota", Value: "icym5wcpejnhljcvy2vwk15svmhrtueoppwlvix61vlbaeedufn1g6u4jgwjoekwew9s2dboxtgrkiyuircnl8h1lbzntt9gzcf60qunhxurhiz0g2bynzy1v6eyn4ravndeiiugobsrsj2bfaguahg4gxn7nx4irwfknunhkk6jdlldevawj8levebjajcrcbeugewd14fa8o34ycfwx2ymalyeqxhfqrsksxnii2deqq6cghrzi6qzwmittkzdtye3imoygqmjjshiskvnzz1e4ipd9c6wfor5jsygn1kvcg6jm4clnsl1fnxotbei9xp4swrkjpgursmfmkyvxcgq9hoy435nwnolo3ipnvdlhk6pmlzpdjn6gqi3v9gv7jn5ro2p1t5ufxzfsvqq1fyrgoi7gvmttil1banh3cftkph1dcoaqfhl7y0wkvhwwvrmslmmxp1wedyn8bacd7akmjgfwdvcmrymbzvmrzfvq1gs1xnmmg8rsfxci2h6r1ralo3splf4f3bdg4c7cy0yy9qbxzxhcmdpwekwc7tdjs8uj6wmofm2aor4hum8nwyfwwlxy3yvsnbjy32oucsrmhcnu6l2i8laujkrhvsr9fcix5jflygznlydbqw5uhw1rg1g5wiihqumwmqgggemzoaivm3ut41vjaff4uqtqyuhuwblmuiphfkd7si49vgeeswzg7tpuw0oxmkesgibkcjtev2h9ouxzjs3eb71jffhdacyiuyhuxwvm5bnrjewbm4x2kmhgbirz3eoj7ijgplggdkx5vixufg65ont8zi1jabsuxx0vsqgprunwkugqkxg2r7iy6fmgs4lob4dlseinowkst6gp6x1ejreauyzjz7atzm3hbmr5rbynuqp4lxrnhhcbuoun69mavvaaki0bdz5ybmbbbz5qdv0odtpjo2aezat5uosjuhzbvic05jlyclikynjgfhencdkz3qcqzbzhnsynj1zdke0sk4zfpvfyryzsxv9pu0qm"},
|
||||
{
|
||||
Key: "valuequota",
|
||||
Value: "icym5wcpejnhljcvy2vwk15svmhrtueoppwlvix61vlbaeedufn1g6u4jgwjoekwew9s2dboxtgrkiyuircnl8h1lbzntt9gzcf60qunhxurhiz0g2bynzy1v6eyn4ravnde" +
|
||||
"iiugobsrsj2bfaguahg4gxn7nx4irwfknunhkk6jdlldevawj8levebjajcrcbeugewd14fa8o34ycfwx2ymalyeqxhfqrsksxnii2deqq6cghrzi6qzwmittkzdtye3imoygqm" +
|
||||
"jjshiskvnzz1e4ipd9c6wfor5jsygn1kvcg6jm4clnsl1fnxotbei9xp4swrkjpgursmfmkyvxcgq9hoy435nwnolo3ipnvdlhk6pmlzpdjn6gqi3v9gv7jn5ro2p1t5ufxzfsv" +
|
||||
"qq1fyrgoi7gvmttil1banh3cftkph1dcoaqfhl7y0wkvhwwvrmslmmxp1wedyn8bacd7akmjgfwdvcmrymbzvmrzfvq1gs1xnmmg8rsfxci2h6r1ralo3splf4f3bdg4c7cy0yy" +
|
||||
"9qbxzxhcmdpwekwc7tdjs8uj6wmofm2aor4hum8nwyfwwlxy3yvsnbjy32oucsrmhcnu6l2i8laujkrhvsr9fcix5jflygznlydbqw5uhw1rg1g5wiihqumwmqgggemzoaivm3u" +
|
||||
"t41vjaff4uqtqyuhuwblmuiphfkd7si49vgeeswzg7tpuw0oxmkesgibkcjtev2h9ouxzjs3eb71jffhdacyiuyhuxwvm5bnrjewbm4x2kmhgbirz3eoj7ijgplggdkx5vixufg" +
|
||||
"65ont8zi1jabsuxx0vsqgprunwkugqkxg2r7iy6fmgs4lob4dlseinowkst6gp6x1ejreauyzjz7atzm3hbmr5rbynuqp4lxrnhhcbuoun69mavvaaki0bdz5ybmbbbz5qdv0od" +
|
||||
"tpjo2aezat5uosjuhzbvic05jlyclikynjgfhencdkz3qcqzbzhnsynj1zdke0sk4zfpvfyryzsxv9pu0qm",
|
||||
},
|
||||
}
|
||||
|
||||
s := &Stackdriver{
|
||||
|
|
|
|||
|
|
@ -55,7 +55,11 @@ func (q *STOMP) Connect() error {
|
|||
}
|
||||
}
|
||||
|
||||
q.stomp, err = stomp.Connect(q.conn, stomp.ConnOpt.HeartBeat(time.Duration(q.HeartBeatSend), time.Duration(q.HeartBeatRec)), stomp.ConnOpt.Login(q.Username, q.Password))
|
||||
q.stomp, err = stomp.Connect(
|
||||
q.conn,
|
||||
stomp.ConnOpt.HeartBeat(time.Duration(q.HeartBeatSend), time.Duration(q.HeartBeatRec)),
|
||||
stomp.ConnOpt.Login(q.Username, q.Password),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -120,7 +120,13 @@ func TestSyslogMapperWithDefaultSdid(t *testing.T) {
|
|||
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||
require.NoError(t, err)
|
||||
str, _ := syslogMessage.String()
|
||||
require.Equal(t, "<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message", str, "Wrong syslog message")
|
||||
require.Equal(
|
||||
t,
|
||||
"<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" "+
|
||||
"value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message",
|
||||
str,
|
||||
"Wrong syslog message",
|
||||
)
|
||||
}
|
||||
|
||||
func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) {
|
||||
|
|
@ -158,7 +164,13 @@ func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) {
|
|||
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
|
||||
require.NoError(t, err)
|
||||
str, _ := syslogMessage.String()
|
||||
require.Equal(t, "<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message", str, "Wrong syslog message")
|
||||
require.Equal(
|
||||
t,
|
||||
"<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 "+
|
||||
"tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message",
|
||||
str,
|
||||
"Wrong syslog message",
|
||||
)
|
||||
}
|
||||
|
||||
func TestSyslogMapperWithNoSdids(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -52,7 +52,11 @@ type (
|
|||
WriteClient interface {
|
||||
CreateTable(context.Context, *timestreamwrite.CreateTableInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.CreateTableOutput, error)
|
||||
WriteRecords(context.Context, *timestreamwrite.WriteRecordsInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error)
|
||||
DescribeDatabase(context.Context, *timestreamwrite.DescribeDatabaseInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.DescribeDatabaseOutput, error)
|
||||
DescribeDatabase(
|
||||
context.Context,
|
||||
*timestreamwrite.DescribeDatabaseInput,
|
||||
...func(*timestreamwrite.Options),
|
||||
) (*timestreamwrite.DescribeDatabaseOutput, error)
|
||||
}
|
||||
)
|
||||
|
||||
|
|
@ -325,7 +329,11 @@ func (t *Timestream) logWriteToTimestreamError(err error, tableName *string) {
|
|||
|
||||
func (t *Timestream) createTableAndRetry(writeRecordsInput *timestreamwrite.WriteRecordsInput) error {
|
||||
if t.CreateTableIfNotExists {
|
||||
t.Log.Infof("Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'.", *writeRecordsInput.TableName, t.DatabaseName)
|
||||
t.Log.Infof(
|
||||
"Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'.",
|
||||
*writeRecordsInput.TableName,
|
||||
t.DatabaseName,
|
||||
)
|
||||
err := t.createTable(writeRecordsInput.TableName)
|
||||
if err == nil {
|
||||
t.Log.Infof("Table '%s' in database '%s' created. Retrying writing.", *writeRecordsInput.TableName, t.DatabaseName)
|
||||
|
|
@ -333,7 +341,8 @@ func (t *Timestream) createTableAndRetry(writeRecordsInput *timestreamwrite.Writ
|
|||
}
|
||||
t.Log.Errorf("Failed to create table '%s' in database '%s': %s. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName, err)
|
||||
} else {
|
||||
t.Log.Errorf("Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName)
|
||||
t.Log.Errorf("Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!",
|
||||
*writeRecordsInput.TableName, t.DatabaseName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,10 +4,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetTimestreamTime(t *testing.T) {
|
||||
|
|
@ -55,17 +54,34 @@ func TestPartitionRecords(t *testing.T) {
|
|||
func TestConvertValueSupported(t *testing.T) {
|
||||
intInputValues := []interface{}{-1, int8(-2), int16(-3), int32(-4), int64(-5)}
|
||||
intOutputValues := []string{"-1", "-2", "-3", "-4", "-5"}
|
||||
intOutputValueTypes := []types.MeasureValueType{types.MeasureValueTypeBigint, types.MeasureValueTypeBigint, types.MeasureValueTypeBigint, types.MeasureValueTypeBigint, types.MeasureValueTypeBigint}
|
||||
intOutputValueTypes := []types.MeasureValueType{
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
}
|
||||
testConvertValueSupportedCases(t, intInputValues, intOutputValues, intOutputValueTypes)
|
||||
|
||||
uintInputValues := []interface{}{uint(1), uint8(2), uint16(3), uint32(4), uint64(5)}
|
||||
uintOutputValues := []string{"1", "2", "3", "4", "5"}
|
||||
uintOutputValueTypes := []types.MeasureValueType{types.MeasureValueTypeBigint, types.MeasureValueTypeBigint, types.MeasureValueTypeBigint, types.MeasureValueTypeBigint, types.MeasureValueTypeBigint}
|
||||
uintOutputValueTypes := []types.MeasureValueType{
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
types.MeasureValueTypeBigint,
|
||||
}
|
||||
testConvertValueSupportedCases(t, uintInputValues, uintOutputValues, uintOutputValueTypes)
|
||||
|
||||
otherInputValues := []interface{}{"foo", float32(22.123), 22.1234, true}
|
||||
otherOutputValues := []string{"foo", "22.123", "22.1234", "true"}
|
||||
otherOutputValueTypes := []types.MeasureValueType{types.MeasureValueTypeVarchar, types.MeasureValueTypeDouble, types.MeasureValueTypeDouble, types.MeasureValueTypeBoolean}
|
||||
otherOutputValueTypes := []types.MeasureValueType{
|
||||
types.MeasureValueTypeVarchar,
|
||||
types.MeasureValueTypeDouble,
|
||||
types.MeasureValueTypeDouble,
|
||||
types.MeasureValueTypeBoolean,
|
||||
}
|
||||
testConvertValueSupportedCases(t, otherInputValues, otherOutputValues, otherOutputValueTypes)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,14 +41,28 @@ type mockTimestreamClient struct {
|
|||
WriteRecordsRequestCount int
|
||||
}
|
||||
|
||||
func (m *mockTimestreamClient) CreateTable(context.Context, *timestreamwrite.CreateTableInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.CreateTableOutput, error) {
|
||||
func (m *mockTimestreamClient) CreateTable(
|
||||
context.Context,
|
||||
*timestreamwrite.CreateTableInput,
|
||||
...func(*timestreamwrite.Options),
|
||||
) (*timestreamwrite.CreateTableOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockTimestreamClient) WriteRecords(context.Context, *timestreamwrite.WriteRecordsInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) {
|
||||
|
||||
func (m *mockTimestreamClient) WriteRecords(
|
||||
context.Context,
|
||||
*timestreamwrite.WriteRecordsInput,
|
||||
...func(*timestreamwrite.Options),
|
||||
) (*timestreamwrite.WriteRecordsOutput, error) {
|
||||
m.WriteRecordsRequestCount++
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockTimestreamClient) DescribeDatabase(context.Context, *timestreamwrite.DescribeDatabaseInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.DescribeDatabaseOutput, error) {
|
||||
|
||||
func (m *mockTimestreamClient) DescribeDatabase(
|
||||
context.Context,
|
||||
*timestreamwrite.DescribeDatabaseInput,
|
||||
...func(*timestreamwrite.Options),
|
||||
) (*timestreamwrite.DescribeDatabaseOutput, error) {
|
||||
return nil, fmt.Errorf("hello from DescribeDatabase")
|
||||
}
|
||||
|
||||
|
|
@ -474,13 +488,27 @@ type mockTimestreamErrorClient struct {
|
|||
ErrorToReturnOnWriteRecords error
|
||||
}
|
||||
|
||||
func (m *mockTimestreamErrorClient) CreateTable(context.Context, *timestreamwrite.CreateTableInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.CreateTableOutput, error) {
|
||||
func (m *mockTimestreamErrorClient) CreateTable(
|
||||
context.Context,
|
||||
*timestreamwrite.CreateTableInput,
|
||||
...func(*timestreamwrite.Options),
|
||||
) (*timestreamwrite.CreateTableOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockTimestreamErrorClient) WriteRecords(context.Context, *timestreamwrite.WriteRecordsInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) {
|
||||
|
||||
func (m *mockTimestreamErrorClient) WriteRecords(
|
||||
context.Context,
|
||||
*timestreamwrite.WriteRecordsInput,
|
||||
...func(*timestreamwrite.Options),
|
||||
) (*timestreamwrite.WriteRecordsOutput, error) {
|
||||
return nil, m.ErrorToReturnOnWriteRecords
|
||||
}
|
||||
func (m *mockTimestreamErrorClient) DescribeDatabase(context.Context, *timestreamwrite.DescribeDatabaseInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.DescribeDatabaseOutput, error) {
|
||||
|
||||
func (m *mockTimestreamErrorClient) DescribeDatabase(
|
||||
context.Context,
|
||||
*timestreamwrite.DescribeDatabaseInput,
|
||||
...func(*timestreamwrite.Options),
|
||||
) (*timestreamwrite.DescribeDatabaseOutput, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,10 +8,11 @@ import (
|
|||
"regexp"
|
||||
"strings"
|
||||
|
||||
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
serializer "github.com/influxdata/telegraf/plugins/serializers/wavefront"
|
||||
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
|
|
@ -134,7 +135,14 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
|
|||
return fmt.Errorf("wavefront sending error: %v", err)
|
||||
}
|
||||
w.Log.Errorf("non-retryable error during Wavefront.Write: %v", err)
|
||||
w.Log.Debugf("Non-retryable metric data: Name: %v, Value: %v, Timestamp: %v, Source: %v, PointTags: %v ", point.Metric, point.Value, point.Timestamp, point.Source, point.Tags)
|
||||
w.Log.Debugf(
|
||||
"Non-retryable metric data: Name: %v, Value: %v, Timestamp: %v, Source: %v, PointTags: %v ",
|
||||
point.Metric,
|
||||
point.Value,
|
||||
point.Timestamp,
|
||||
point.Source,
|
||||
point.Tags,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,12 +6,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
serializer "github.com/influxdata/telegraf/plugins/serializers/wavefront"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// default config used by Tests
|
||||
|
|
@ -64,7 +65,9 @@ func TestBuildMetrics(t *testing.T) {
|
|||
},
|
||||
{
|
||||
testMetric1,
|
||||
[]serializer.MetricPoint{{Metric: w.Prefix + "test.simple.metric", Value: 123, Timestamp: timestamp, Source: "testHost", Tags: map[string]string{"tag1": "value1"}}},
|
||||
[]serializer.MetricPoint{
|
||||
{Metric: w.Prefix + "test.simple.metric", Value: 123, Timestamp: timestamp, Source: "testHost", Tags: map[string]string{"tag1": "value1"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -101,7 +104,12 @@ func TestBuildMetricsStrict(t *testing.T) {
|
|||
{
|
||||
testutil.TestMetric(float64(1), "testing_just/another,metric:float", "metric2"),
|
||||
[]serializer.MetricPoint{
|
||||
{Metric: w.Prefix + "testing.just/another,metric-float", Value: 1, Timestamp: timestamp, Tags: map[string]string{"tag/1": "value1", "tag,2": "value2"}},
|
||||
{
|
||||
Metric: w.Prefix + "testing.just/another,metric-float",
|
||||
Value: 1,
|
||||
Timestamp: timestamp,
|
||||
Tags: map[string]string{"tag/1": "value1", "tag,2": "value2"},
|
||||
},
|
||||
{Metric: w.Prefix + "testing.metric2", Value: 1, Timestamp: timestamp, Tags: map[string]string{"tag/1": "value1", "tag,2": "value2"}},
|
||||
},
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue