chore: Fix linter findings for `revive:comment-spacings` (part 3) (#15898)
This commit is contained in:
parent
453d32bd81
commit
253a114622
|
|
@ -133,7 +133,7 @@ func (adx *AzureDataExplorer) writeTablePerMetric(metrics []telegraf.Metric) err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error {
|
func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error {
|
||||||
//serialise each metric in metrics - store in byte[]
|
// serialise each metric in metrics - store in byte[]
|
||||||
metricsArray := make([]byte, 0)
|
metricsArray := make([]byte, 0)
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
metricsInBytes, err := adx.serializer.Serialize(m)
|
metricsInBytes, err := adx.serializer.Serialize(m)
|
||||||
|
|
@ -147,7 +147,7 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error
|
||||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
//push metrics to a single table
|
// push metrics to a single table
|
||||||
format := ingest.FileFormat(ingest.JSON)
|
format := ingest.FileFormat(ingest.JSON)
|
||||||
err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray)
|
err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray)
|
||||||
return err
|
return err
|
||||||
|
|
@ -181,7 +181,7 @@ func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName s
|
||||||
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
|
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
|
||||||
return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err)
|
return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err)
|
||||||
}
|
}
|
||||||
//create a new ingestor client for the table
|
// create a new ingestor client for the table
|
||||||
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
|
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err)
|
return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err)
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ import (
|
||||||
|
|
||||||
func TestPubSub_WriteSingle(t *testing.T) {
|
func TestPubSub_WriteSingle(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error */},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
}
|
}
|
||||||
|
|
||||||
settings := pubsub.DefaultPublishSettings
|
settings := pubsub.DefaultPublishSettings
|
||||||
|
|
@ -31,7 +31,7 @@ func TestPubSub_WriteSingle(t *testing.T) {
|
||||||
|
|
||||||
func TestPubSub_WriteWithAttribute(t *testing.T) {
|
func TestPubSub_WriteWithAttribute(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error*/},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
}
|
}
|
||||||
|
|
||||||
settings := pubsub.DefaultPublishSettings
|
settings := pubsub.DefaultPublishSettings
|
||||||
|
|
@ -52,7 +52,7 @@ func TestPubSub_WriteWithAttribute(t *testing.T) {
|
||||||
|
|
||||||
func TestPubSub_WriteMultiple(t *testing.T) {
|
func TestPubSub_WriteMultiple(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error*/},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
{testutil.TestMetric("value_2", "test"), false},
|
{testutil.TestMetric("value_2", "test"), false},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -70,7 +70,7 @@ func TestPubSub_WriteMultiple(t *testing.T) {
|
||||||
|
|
||||||
func TestPubSub_WriteOverCountThreshold(t *testing.T) {
|
func TestPubSub_WriteOverCountThreshold(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error*/},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
{testutil.TestMetric("value_2", "test"), false},
|
{testutil.TestMetric("value_2", "test"), false},
|
||||||
{testutil.TestMetric("value_3", "test"), false},
|
{testutil.TestMetric("value_3", "test"), false},
|
||||||
{testutil.TestMetric("value_4", "test"), false},
|
{testutil.TestMetric("value_4", "test"), false},
|
||||||
|
|
@ -91,7 +91,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) {
|
||||||
|
|
||||||
func TestPubSub_WriteOverByteThreshold(t *testing.T) {
|
func TestPubSub_WriteOverByteThreshold(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error*/},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
{testutil.TestMetric("value_2", "test"), false},
|
{testutil.TestMetric("value_2", "test"), false},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -111,7 +111,7 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) {
|
||||||
|
|
||||||
func TestPubSub_WriteBase64Single(t *testing.T) {
|
func TestPubSub_WriteBase64Single(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error */},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
{testutil.TestMetric("value_2", "test"), false},
|
{testutil.TestMetric("value_2", "test"), false},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -145,7 +145,7 @@ func TestPubSub_Error(t *testing.T) {
|
||||||
|
|
||||||
func TestPubSub_WriteGzipSingle(t *testing.T) {
|
func TestPubSub_WriteGzipSingle(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error */},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
{testutil.TestMetric("value_2", "test"), false},
|
{testutil.TestMetric("value_2", "test"), false},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -167,7 +167,7 @@ func TestPubSub_WriteGzipSingle(t *testing.T) {
|
||||||
|
|
||||||
func TestPubSub_WriteGzipAndBase64Single(t *testing.T) {
|
func TestPubSub_WriteGzipAndBase64Single(t *testing.T) {
|
||||||
testMetrics := []testMetric{
|
testMetrics := []testMetric{
|
||||||
{testutil.TestMetric("value_1", "test"), false /*return error */},
|
{testutil.TestMetric("value_1", "test"), false},
|
||||||
{testutil.TestMetric("value_2", "test"), false},
|
{testutil.TestMetric("value_2", "test"), false},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -56,20 +56,20 @@ type cloudWatchLogs interface {
|
||||||
// CloudWatchLogs plugin object definition
|
// CloudWatchLogs plugin object definition
|
||||||
type CloudWatchLogs struct {
|
type CloudWatchLogs struct {
|
||||||
LogGroup string `toml:"log_group"`
|
LogGroup string `toml:"log_group"`
|
||||||
lg *types.LogGroup //log group data
|
lg *types.LogGroup // log group data
|
||||||
|
|
||||||
LogStream string `toml:"log_stream"`
|
LogStream string `toml:"log_stream"`
|
||||||
lsKey string //log stream source: tag or field
|
lsKey string // log stream source: tag or field
|
||||||
lsSource string //log stream source tag or field name
|
lsSource string // log stream source tag or field name
|
||||||
ls map[string]*logStreamContainer //log stream info
|
ls map[string]*logStreamContainer // log stream info
|
||||||
|
|
||||||
LDMetricName string `toml:"log_data_metric_name"`
|
LDMetricName string `toml:"log_data_metric_name"`
|
||||||
|
|
||||||
LDSource string `toml:"log_data_source"`
|
LDSource string `toml:"log_data_source"`
|
||||||
logDatKey string //log data source (tag or field)
|
logDatKey string // log data source (tag or field)
|
||||||
logDataSource string //log data source tag or field name
|
logDataSource string // log data source tag or field name
|
||||||
|
|
||||||
svc cloudWatchLogs //cloudwatch logs service
|
svc cloudWatchLogs // cloudwatch logs service
|
||||||
|
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
|
@ -79,7 +79,7 @@ type CloudWatchLogs struct {
|
||||||
const (
|
const (
|
||||||
// Log events must comply with the following
|
// Log events must comply with the following
|
||||||
// (https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatchlogs/#CloudWatchLogs.PutLogEvents):
|
// (https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatchlogs/#CloudWatchLogs.PutLogEvents):
|
||||||
maxLogMessageLength = 262144 - awsOverheadPerLogMessageBytes //In bytes
|
maxLogMessageLength = 262144 - awsOverheadPerLogMessageBytes // In bytes
|
||||||
maxBatchSizeBytes = 1048576 // The sum of all event messages in UTF-8, plus 26 bytes for each log event
|
maxBatchSizeBytes = 1048576 // The sum of all event messages in UTF-8, plus 26 bytes for each log event
|
||||||
awsOverheadPerLogMessageBytes = 26
|
awsOverheadPerLogMessageBytes = 26
|
||||||
maxFutureLogEventTimeOffset = time.Hour * 2 // None of the log events in the batch can be more than 2 hours in the future.
|
maxFutureLogEventTimeOffset = time.Hour * 2 // None of the log events in the batch can be more than 2 hours in the future.
|
||||||
|
|
@ -89,7 +89,7 @@ const (
|
||||||
|
|
||||||
maxItemsInBatch = 10000 // The maximum number of log events in a batch is 10,000.
|
maxItemsInBatch = 10000 // The maximum number of log events in a batch is 10,000.
|
||||||
|
|
||||||
//maxTimeSpanInBatch = time.Hour * 24 // A batch of log events in a single request cannot span more than 24 hours.
|
// maxTimeSpanInBatch = time.Hour * 24 // A batch of log events in a single request cannot span more than 24 hours.
|
||||||
// Otherwise, the operation fails.
|
// Otherwise, the operation fails.
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -164,8 +164,8 @@ func (c *CloudWatchLogs) Connect() error {
|
||||||
c.svc = cloudwatchlogs.NewFromConfig(cfg)
|
c.svc = cloudwatchlogs.NewFromConfig(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Find log group with name 'c.LogGroup'
|
// Find log group with name 'c.LogGroup'
|
||||||
if c.lg == nil { //In case connection is not retried, first time
|
if c.lg == nil { // In case connection is not retried, first time
|
||||||
for logGroupsOutput.NextToken != nil {
|
for logGroupsOutput.NextToken != nil {
|
||||||
logGroupsOutput, err = c.svc.DescribeLogGroups(
|
logGroupsOutput, err = c.svc.DescribeLogGroups(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
|
@ -228,7 +228,7 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
maxTime := time.Now().Add(maxFutureLogEventTimeOffset)
|
maxTime := time.Now().Add(maxFutureLogEventTimeOffset)
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
//Filtering metrics
|
// Filtering metrics
|
||||||
if m.Name() != c.LDMetricName {
|
if m.Name() != c.LDMetricName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -281,7 +281,7 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check if message size is not fit to batch
|
// Check if message size is not fit to batch
|
||||||
if len(logData) > maxLogMessageLength {
|
if len(logData) > maxLogMessageLength {
|
||||||
metricStr := fmt.Sprintf("%v", m)
|
metricStr := fmt.Sprintf("%v", m)
|
||||||
c.Log.Errorf(
|
c.Log.Errorf(
|
||||||
|
|
@ -291,13 +291,13 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
//Batching log messages
|
// Batching log messages
|
||||||
//awsOverheadPerLogMessageBytes - is mandatory aws overhead per each log message
|
// awsOverheadPerLogMessageBytes - is mandatory aws overhead per each log message
|
||||||
messageSizeInBytesForAWS := len(logData) + awsOverheadPerLogMessageBytes
|
messageSizeInBytesForAWS := len(logData) + awsOverheadPerLogMessageBytes
|
||||||
|
|
||||||
//Pick up existing or prepare new log stream container.
|
// Pick up existing or prepare new log stream container.
|
||||||
//Log stream container stores logs per log stream in
|
// Log stream container stores logs per log stream in
|
||||||
//the AWS Cloudwatch logs API friendly structure
|
// the AWS Cloudwatch logs API friendly structure
|
||||||
if val, ok := c.ls[logStream]; ok {
|
if val, ok := c.ls[logStream]; ok {
|
||||||
lsContainer = val
|
lsContainer = val
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -308,7 +308,7 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
if lsContainer.currentBatchSizeBytes+messageSizeInBytesForAWS > maxBatchSizeBytes ||
|
if lsContainer.currentBatchSizeBytes+messageSizeInBytesForAWS > maxBatchSizeBytes ||
|
||||||
lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount >= maxItemsInBatch {
|
lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount >= maxItemsInBatch {
|
||||||
//Need to start new batch, and reset counters
|
// Need to start new batch, and reset counters
|
||||||
lsContainer.currentBatchIndex++
|
lsContainer.currentBatchIndex++
|
||||||
lsContainer.messageBatches = append(lsContainer.messageBatches,
|
lsContainer.messageBatches = append(lsContainer.messageBatches,
|
||||||
messageBatch{
|
messageBatch{
|
||||||
|
|
@ -320,10 +320,10 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount++
|
lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
//AWS need time in milliseconds. time.UnixNano() returns time in nanoseconds since epoch
|
// AWS need time in milliseconds. time.UnixNano() returns time in nanoseconds since epoch
|
||||||
//we store here TS with nanosec precision iun order to have proper ordering, later ts will be reduced to milliseconds
|
// we store here TS with nanosec precision iun order to have proper ordering, later ts will be reduced to milliseconds
|
||||||
metricTime := m.Time().UnixNano()
|
metricTime := m.Time().UnixNano()
|
||||||
//Adding metring to batch
|
// Adding metring to batch
|
||||||
lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents =
|
lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents =
|
||||||
append(lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents,
|
append(lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents,
|
||||||
types.InputLogEvent{
|
types.InputLogEvent{
|
||||||
|
|
@ -337,15 +337,15 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
if len(batch.logEvents) == 0 {
|
if len(batch.logEvents) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
//Sorting
|
// Sorting
|
||||||
sort.Slice(batch.logEvents[:], func(i, j int) bool {
|
sort.Slice(batch.logEvents[:], func(i, j int) bool {
|
||||||
return *batch.logEvents[i].Timestamp < *batch.logEvents[j].Timestamp
|
return *batch.logEvents[i].Timestamp < *batch.logEvents[j].Timestamp
|
||||||
})
|
})
|
||||||
|
|
||||||
putLogEvents := cloudwatchlogs.PutLogEventsInput{LogGroupName: &c.LogGroup, LogStreamName: &logStream}
|
putLogEvents := cloudwatchlogs.PutLogEventsInput{LogGroupName: &c.LogGroup, LogStreamName: &logStream}
|
||||||
if elem.sequenceToken == "" {
|
if elem.sequenceToken == "" {
|
||||||
//This is the first attempt to write to log stream,
|
// This is the first attempt to write to log stream,
|
||||||
//need to check log stream existence and create it if necessary
|
// need to check log stream existence and create it if necessary
|
||||||
describeLogStreamOutput, err := c.svc.DescribeLogStreams(context.Background(), &cloudwatchlogs.DescribeLogStreamsInput{
|
describeLogStreamOutput, err := c.svc.DescribeLogStreams(context.Background(), &cloudwatchlogs.DescribeLogStreamsInput{
|
||||||
LogGroupName: &c.LogGroup,
|
LogGroupName: &c.LogGroup,
|
||||||
LogStreamNamePrefix: &logStream})
|
LogStreamNamePrefix: &logStream})
|
||||||
|
|
@ -360,7 +360,7 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
putLogEvents.SequenceToken = nil
|
putLogEvents.SequenceToken = nil
|
||||||
} else if err == nil && len(describeLogStreamOutput.LogStreams) == 1 {
|
} else if err == nil && len(describeLogStreamOutput.LogStreams) == 1 {
|
||||||
putLogEvents.SequenceToken = describeLogStreamOutput.LogStreams[0].UploadSequenceToken
|
putLogEvents.SequenceToken = describeLogStreamOutput.LogStreams[0].UploadSequenceToken
|
||||||
} else if err == nil && len(describeLogStreamOutput.LogStreams) > 1 { //Ambiguity
|
} else if err == nil && len(describeLogStreamOutput.LogStreams) > 1 { // Ambiguity
|
||||||
c.Log.Errorf("More than 1 log stream found with prefix %q in log group %q.", logStream, c.LogGroup)
|
c.Log.Errorf("More than 1 log stream found with prefix %q in log group %q.", logStream, c.LogGroup)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -371,21 +371,21 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
putLogEvents.SequenceToken = &c.ls[logStream].sequenceToken
|
putLogEvents.SequenceToken = &c.ls[logStream].sequenceToken
|
||||||
}
|
}
|
||||||
|
|
||||||
//Upload log events
|
// Upload log events
|
||||||
//Adjusting TS to be in align with cloudwatch logs requirements
|
// Adjusting TS to be in align with cloudwatch logs requirements
|
||||||
for _, event := range batch.logEvents {
|
for _, event := range batch.logEvents {
|
||||||
*event.Timestamp = *event.Timestamp / 1000000
|
*event.Timestamp = *event.Timestamp / 1000000
|
||||||
}
|
}
|
||||||
putLogEvents.LogEvents = batch.logEvents
|
putLogEvents.LogEvents = batch.logEvents
|
||||||
|
|
||||||
//There is a quota of 5 requests per second per log stream. Additional
|
// There is a quota of 5 requests per second per log stream. Additional
|
||||||
//requests are throttled. This quota can't be changed.
|
// requests are throttled. This quota can't be changed.
|
||||||
putLogEventsOutput, err := c.svc.PutLogEvents(context.Background(), &putLogEvents)
|
putLogEventsOutput, err := c.svc.PutLogEvents(context.Background(), &putLogEvents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.Log.Errorf("Can't push logs batch to AWS. Reason: %v", err)
|
c.Log.Errorf("Can't push logs batch to AWS. Reason: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
//Cleanup batch
|
// Cleanup batch
|
||||||
elem.messageBatches[index] = messageBatch{
|
elem.messageBatches[index] = messageBatch{
|
||||||
logEvents: []types.InputLogEvent{},
|
logEvents: []types.InputLogEvent{},
|
||||||
messageCount: 0}
|
messageCount: 0}
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ func (c *mockCloudWatchLogs) PutLogEvents(
|
||||||
) (*cloudwatchlogsV2.PutLogEventsOutput, error) {
|
) (*cloudwatchlogsV2.PutLogEventsOutput, error) {
|
||||||
sequenceToken := "arbitraryToken"
|
sequenceToken := "arbitraryToken"
|
||||||
output := &cloudwatchlogsV2.PutLogEventsOutput{NextSequenceToken: &sequenceToken}
|
output := &cloudwatchlogsV2.PutLogEventsOutput{NextSequenceToken: &sequenceToken}
|
||||||
//Saving messages
|
// Saving messages
|
||||||
c.pushedLogEvents = append(c.pushedLogEvents, input.LogEvents...)
|
c.pushedLogEvents = append(c.pushedLogEvents, input.LogEvents...)
|
||||||
|
|
||||||
return output, nil
|
return output, nil
|
||||||
|
|
@ -257,7 +257,7 @@ func TestInit(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConnect(t *testing.T) {
|
func TestConnect(t *testing.T) {
|
||||||
//mock cloudwatch logs endpoint that is used only in plugin.Connect
|
// mock cloudwatch logs endpoint that is used only in plugin.Connect
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
fmt.Fprintln(w,
|
fmt.Fprintln(w,
|
||||||
`{
|
`{
|
||||||
|
|
@ -297,7 +297,7 @@ func TestConnect(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite(t *testing.T) {
|
func TestWrite(t *testing.T) {
|
||||||
//mock cloudwatch logs endpoint that is used only in plugin.Connect
|
// mock cloudwatch logs endpoint that is used only in plugin.Connect
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
fmt.Fprintln(w,
|
fmt.Fprintln(w,
|
||||||
`{
|
`{
|
||||||
|
|
@ -338,7 +338,7 @@ func TestWrite(t *testing.T) {
|
||||||
name string
|
name string
|
||||||
logStreamName string
|
logStreamName string
|
||||||
metrics []telegraf.Metric
|
metrics []telegraf.Metric
|
||||||
expectedMetricsOrder map[int]int //map[<index of pushed log event>]<index of corresponding metric>
|
expectedMetricsOrder map[int]int // map[<index of pushed log event>]<index of corresponding metric>
|
||||||
expectedMetricsCount int
|
expectedMetricsCount int
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
|
@ -470,7 +470,7 @@ func TestWrite(t *testing.T) {
|
||||||
},
|
},
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"container_id": "deadbeef",
|
"container_id": "deadbeef",
|
||||||
//Here comes very long message
|
// Here comes very long message
|
||||||
"message": RandStringBytes(maxLogMessageLength + 1),
|
"message": RandStringBytes(maxLogMessageLength + 1),
|
||||||
},
|
},
|
||||||
time.Now().Add(-time.Minute),
|
time.Now().Add(-time.Minute),
|
||||||
|
|
@ -494,7 +494,7 @@ func TestWrite(t *testing.T) {
|
||||||
},
|
},
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"container_id": "deadbeef",
|
"container_id": "deadbeef",
|
||||||
//Here comes very long message to cause message batching
|
// Here comes very long message to cause message batching
|
||||||
"message": "batch1 message1:" + RandStringBytes(maxLogMessageLength-16),
|
"message": "batch1 message1:" + RandStringBytes(maxLogMessageLength-16),
|
||||||
},
|
},
|
||||||
time.Now().Add(-4*time.Minute),
|
time.Now().Add(-4*time.Minute),
|
||||||
|
|
@ -510,7 +510,7 @@ func TestWrite(t *testing.T) {
|
||||||
},
|
},
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"container_id": "deadbeef",
|
"container_id": "deadbeef",
|
||||||
//Here comes very long message to cause message batching
|
// Here comes very long message to cause message batching
|
||||||
"message": "batch1 message2:" + RandStringBytes(maxLogMessageLength-16),
|
"message": "batch1 message2:" + RandStringBytes(maxLogMessageLength-16),
|
||||||
},
|
},
|
||||||
time.Now().Add(-3*time.Minute),
|
time.Now().Add(-3*time.Minute),
|
||||||
|
|
@ -526,7 +526,7 @@ func TestWrite(t *testing.T) {
|
||||||
},
|
},
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"container_id": "deadbeef",
|
"container_id": "deadbeef",
|
||||||
//Here comes very long message to cause message batching
|
// Here comes very long message to cause message batching
|
||||||
"message": "batch1 message3:" + RandStringBytes(maxLogMessageLength-16),
|
"message": "batch1 message3:" + RandStringBytes(maxLogMessageLength-16),
|
||||||
},
|
},
|
||||||
time.Now().Add(-2*time.Minute),
|
time.Now().Add(-2*time.Minute),
|
||||||
|
|
@ -542,7 +542,7 @@ func TestWrite(t *testing.T) {
|
||||||
},
|
},
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"container_id": "deadbeef",
|
"container_id": "deadbeef",
|
||||||
//Here comes very long message to cause message batching
|
// Here comes very long message to cause message batching
|
||||||
"message": "batch1 message4:" + RandStringBytes(maxLogMessageLength-16),
|
"message": "batch1 message4:" + RandStringBytes(maxLogMessageLength-16),
|
||||||
},
|
},
|
||||||
time.Now().Add(-time.Minute),
|
time.Now().Add(-time.Minute),
|
||||||
|
|
@ -568,7 +568,7 @@ func TestWrite(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
//Overwrite cloud watch log endpoint
|
// Overwrite cloud watch log endpoint
|
||||||
mockCwl := &mockCloudWatchLogs{}
|
mockCwl := &mockCloudWatchLogs{}
|
||||||
mockCwl.Init(tt.logStreamName)
|
mockCwl.Init(tt.logStreamName)
|
||||||
plugin.svc = mockCwl
|
plugin.svc = mockCwl
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "github.com/jackc/pgx/v4/stdlib" //to register stdlib from PostgreSQL Driver and Toolkit
|
_ "github.com/jackc/pgx/v4/stdlib" // to register stdlib from PostgreSQL Driver and Toolkit
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
|
|
||||||
|
|
@ -254,7 +254,7 @@ func (a *Elasticsearch) Connect() error {
|
||||||
// GetPointID generates a unique ID for a Metric Point
|
// GetPointID generates a unique ID for a Metric Point
|
||||||
func GetPointID(m telegraf.Metric) string {
|
func GetPointID(m telegraf.Metric) string {
|
||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
//Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID
|
// Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID
|
||||||
|
|
||||||
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
|
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
|
||||||
buffer.WriteString(m.Name())
|
buffer.WriteString(m.Name())
|
||||||
|
|
|
||||||
|
|
@ -146,7 +146,7 @@ func TestGraphiteOK(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2(t, &wg2)
|
TCPServer2(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
// Write but expect an error, but reconnect
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
|
@ -252,7 +252,7 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2(t, &wg2)
|
TCPServer2(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
// Write but expect an error, but reconnect
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
|
@ -316,7 +316,7 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2(t, &wg2)
|
TCPServer2(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
// Write but expect an error, but reconnect
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
|
@ -384,7 +384,7 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2WithMultipleTemplates(t, &wg2)
|
TCPServer2WithMultipleTemplates(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
// Write but expect an error, but reconnect
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
|
@ -448,7 +448,7 @@ func TestGraphiteOkWithTags(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2WithTags(t, &wg2)
|
TCPServer2WithTags(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
// Write but expect an error, but reconnect
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
|
@ -513,7 +513,7 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2WithTags(t, &wg2)
|
TCPServer2WithTags(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
// Write but expect an error, but reconnect
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
|
@ -578,7 +578,7 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
|
||||||
// Start TCP server
|
// Start TCP server
|
||||||
wg2.Add(1)
|
wg2.Add(1)
|
||||||
TCPServer2WithTagsSeparatorUnderscore(t, &wg2)
|
TCPServer2WithTagsSeparatorUnderscore(t, &wg2)
|
||||||
//Write but expect an error, but reconnect
|
// Write but expect an error, but reconnect
|
||||||
err3 := g.Write(metrics2)
|
err3 := g.Write(metrics2)
|
||||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -393,7 +393,7 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//checks for any 4xx code and drops metric and retrying will not make the request work
|
// checks for any 4xx code and drops metric and retrying will not make the request work
|
||||||
if len(resp.Status) > 0 && resp.Status[0] == '4' {
|
if len(resp.Status) > 0 && resp.Status[0] == '4' {
|
||||||
c.log.Errorf("E! [outputs.influxdb] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
|
c.log.Errorf("E! [outputs.influxdb] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -279,19 +279,19 @@ func TestTagSanitization(t *testing.T) {
|
||||||
expected []string
|
expected []string
|
||||||
input []string
|
input []string
|
||||||
}{
|
}{
|
||||||
{ //don't sanitize tags containing UnsopportedCharacter on IoTDB V1.3
|
{ // don't sanitize tags containing UnsopportedCharacter on IoTDB V1.3
|
||||||
name: "Don't Sanitize Tags",
|
name: "Don't Sanitize Tags",
|
||||||
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
|
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
|
||||||
expected: []string{"word", "`word`", "word_"},
|
expected: []string{"word", "`word`", "word_"},
|
||||||
input: []string{"word", "`word`", "word_"},
|
input: []string{"word", "`word`", "word_"},
|
||||||
},
|
},
|
||||||
{ //sanitize tags containing UnsopportedCharacter on IoTDB V1.3 enclosing them in backticks
|
{ // sanitize tags containing UnsupportedCharacter on IoTDB V1.3 enclosing them in backticks
|
||||||
name: "Sanitize Tags",
|
name: "Sanitize Tags",
|
||||||
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
|
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
|
||||||
expected: []string{"`wo rd`", "`@`", "`$`", "`#`", "`:`", "`{`", "`}`", "`1`", "`1234`"},
|
expected: []string{"`wo rd`", "`@`", "`$`", "`#`", "`:`", "`{`", "`}`", "`1`", "`1234`"},
|
||||||
input: []string{"wo rd", "@", "$", "#", ":", "{", "}", "1", "1234"},
|
input: []string{"wo rd", "@", "$", "#", ":", "{", "}", "1", "1234"},
|
||||||
},
|
},
|
||||||
{ //test on forbidden word and forbidden syntax
|
{ // test on forbidden word and forbidden syntax
|
||||||
name: "Errors",
|
name: "Errors",
|
||||||
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
|
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
|
||||||
expected: []string{"", ""},
|
expected: []string{"", ""},
|
||||||
|
|
@ -303,13 +303,13 @@ func TestTagSanitization(t *testing.T) {
|
||||||
expected: []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
|
expected: []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
|
||||||
input: []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
|
input: []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
|
||||||
},
|
},
|
||||||
{ //sanitize tags containing UnsopportedCharacter on IoTDB V0.13 enclosing them in backticks
|
{ // sanitize tags containing UnsupportedCharacter on IoTDB V0.13 enclosing them in backticks
|
||||||
name: "Sanitize Tags",
|
name: "Sanitize Tags",
|
||||||
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
|
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
|
||||||
expected: []string{"`wo rd`", "`\\`"},
|
expected: []string{"`wo rd`", "`\\`"},
|
||||||
input: []string{"wo rd", "\\"},
|
input: []string{"wo rd", "\\"},
|
||||||
},
|
},
|
||||||
{ //test on forbidden word and forbidden syntax on IoTDB V0.13
|
{ // test on forbidden word and forbidden syntax on IoTDB V0.13
|
||||||
name: "Errors",
|
name: "Errors",
|
||||||
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
|
plugin: func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
|
||||||
expected: []string{"", ""},
|
expected: []string{"", ""},
|
||||||
|
|
@ -345,7 +345,7 @@ func TestTagsHandling(t *testing.T) {
|
||||||
expected recordsWithTags
|
expected recordsWithTags
|
||||||
input recordsWithTags
|
input recordsWithTags
|
||||||
}{
|
}{
|
||||||
{ //treat tags as fields. And input Tags are NOT in order.
|
{ // treat tags as fields. And input Tags are NOT in order.
|
||||||
name: "treat tags as fields",
|
name: "treat tags as fields",
|
||||||
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "fields"; return s }(),
|
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "fields"; return s }(),
|
||||||
expected: recordsWithTags{
|
expected: recordsWithTags{
|
||||||
|
|
@ -375,7 +375,7 @@ func TestTagsHandling(t *testing.T) {
|
||||||
}},
|
}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{ //treat tags as device IDs. And input Tags are in order.
|
{ // treat tags as device IDs. And input Tags are in order.
|
||||||
name: "treat tags as device IDs",
|
name: "treat tags as device IDs",
|
||||||
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "device_id"; return s }(),
|
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "device_id"; return s }(),
|
||||||
expected: recordsWithTags{
|
expected: recordsWithTags{
|
||||||
|
|
|
||||||
|
|
@ -87,11 +87,11 @@ func (s *MongoDB) Init() error {
|
||||||
if !strings.HasPrefix(s.Dsn, "mongodb://") && !strings.HasPrefix(s.Dsn, "mongodb+srv://") {
|
if !strings.HasPrefix(s.Dsn, "mongodb://") && !strings.HasPrefix(s.Dsn, "mongodb+srv://") {
|
||||||
return errors.New("invalid connection string. expected mongodb://host:port/?{options} or mongodb+srv://host:port/?{options}")
|
return errors.New("invalid connection string. expected mongodb://host:port/?{options} or mongodb+srv://host:port/?{options}")
|
||||||
}
|
}
|
||||||
if !strings.Contains(s.Dsn[strings.Index(s.Dsn, "://")+3:], "/") { //append '/' to Dsn if its missing
|
if !strings.Contains(s.Dsn[strings.Index(s.Dsn, "://")+3:], "/") { // append '/' to Dsn if its missing
|
||||||
s.Dsn = s.Dsn + "/"
|
s.Dsn = s.Dsn + "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
serverAPIOptions := options.ServerAPI(options.ServerAPIVersion1) //use new mongodb versioned api
|
serverAPIOptions := options.ServerAPI(options.ServerAPIVersion1) // use new mongodb versioned api
|
||||||
s.clientOptions = options.Client().SetServerAPIOptions(serverAPIOptions)
|
s.clientOptions = options.Client().SetServerAPIOptions(serverAPIOptions)
|
||||||
|
|
||||||
switch s.AuthenticationType {
|
switch s.AuthenticationType {
|
||||||
|
|
@ -120,7 +120,7 @@ func (s *MongoDB) Init() error {
|
||||||
password.Destroy()
|
password.Destroy()
|
||||||
s.clientOptions.SetAuth(credential)
|
s.clientOptions.SetAuth(credential)
|
||||||
case "X509":
|
case "X509":
|
||||||
//format connection string to include tls/x509 options
|
// format connection string to include tls/x509 options
|
||||||
newConnectionString, err := url.Parse(s.Dsn)
|
newConnectionString, err := url.Parse(s.Dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ func (nr *NewRelic) Write(metrics []telegraf.Metric) error {
|
||||||
// using HarvestNow.
|
// using HarvestNow.
|
||||||
nr.harvestor.HarvestNow(context.Background())
|
nr.harvestor.HarvestNow(context.Background())
|
||||||
|
|
||||||
//Check if we encountered errors
|
// Check if we encountered errors
|
||||||
if nr.errorCount != 0 {
|
if nr.errorCount != 0 {
|
||||||
return fmt.Errorf("unable to harvest metrics %s ", nr.savedErrors[nr.errorCount])
|
return fmt.Errorf("unable to harvest metrics %s ", nr.savedErrors[nr.errorCount])
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -336,7 +336,7 @@ func isTempError(err error) bool {
|
||||||
errClass := pgErr.Code[:2]
|
errClass := pgErr.Code[:2]
|
||||||
switch errClass {
|
switch errClass {
|
||||||
case "23": // Integrity Constraint Violation
|
case "23": // Integrity Constraint Violation
|
||||||
//23505 - unique_violation
|
// 23505 - unique_violation
|
||||||
if pgErr.Code == "23505" && strings.Contains(err.Error(), "pg_type_typname_nsp_index") {
|
if pgErr.Code == "23505" && strings.Contains(err.Error(), "pg_type_typname_nsp_index") {
|
||||||
// Happens when you try to create 2 tables simultaneously.
|
// Happens when you try to create 2 tables simultaneously.
|
||||||
return true
|
return true
|
||||||
|
|
|
||||||
|
|
@ -102,7 +102,7 @@ func (sw *SocketWriter) Connect() error {
|
||||||
if err := sw.setKeepAlive(c); err != nil {
|
if err := sw.setKeepAlive(c); err != nil {
|
||||||
sw.Log.Debugf("Unable to configure keep alive (%s): %s", sw.Address, err)
|
sw.Log.Debugf("Unable to configure keep alive (%s): %s", sw.Address, err)
|
||||||
}
|
}
|
||||||
//set encoder
|
// set encoder
|
||||||
sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding)
|
sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -154,7 +154,7 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := sw.Conn.Write(bs); err != nil {
|
if _, err := sw.Conn.Write(bs); err != nil {
|
||||||
//TODO log & keep going with remaining strings
|
// TODO log & keep going with remaining strings
|
||||||
var netErr net.Error
|
var netErr net.Error
|
||||||
if errors.As(err, &netErr) {
|
if errors.As(err, &netErr) {
|
||||||
// permanent error. close the connection
|
// permanent error. close the connection
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
//Register sql drivers
|
// Register sql drivers
|
||||||
_ "github.com/ClickHouse/clickhouse-go" // clickhouse
|
_ "github.com/ClickHouse/clickhouse-go" // clickhouse
|
||||||
_ "github.com/go-sql-driver/mysql" // mysql
|
_ "github.com/go-sql-driver/mysql" // mysql
|
||||||
_ "github.com/jackc/pgx/v4/stdlib" // pgx (postgres)
|
_ "github.com/jackc/pgx/v4/stdlib" // pgx (postgres)
|
||||||
|
|
|
||||||
|
|
@ -135,7 +135,7 @@ var (
|
||||||
},
|
},
|
||||||
ts,
|
ts,
|
||||||
),
|
),
|
||||||
stableMetric( //test spaces in metric, tag, and field names
|
stableMetric( // test spaces in metric, tag, and field names
|
||||||
"metric three",
|
"metric three",
|
||||||
[]telegraf.Tag{
|
[]telegraf.Tag{
|
||||||
{
|
{
|
||||||
|
|
@ -192,7 +192,7 @@ func TestMysqlIntegration(t *testing.T) {
|
||||||
require.NoError(t, err, "failed to start container")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer container.Terminate()
|
defer container.Terminate()
|
||||||
|
|
||||||
//use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
|
||||||
username, password, container.Address, container.Ports[servicePort], dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
)
|
)
|
||||||
|
|
@ -275,7 +275,7 @@ func TestPostgresIntegration(t *testing.T) {
|
||||||
require.NoError(t, err, "failed to start container")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer container.Terminate()
|
defer container.Terminate()
|
||||||
|
|
||||||
//use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v",
|
||||||
username, password, container.Address, container.Ports[servicePort], dbname,
|
username, password, container.Address, container.Ports[servicePort], dbname,
|
||||||
|
|
@ -302,10 +302,10 @@ func TestPostgresIntegration(t *testing.T) {
|
||||||
"-c",
|
"-c",
|
||||||
"pg_dump" +
|
"pg_dump" +
|
||||||
" --username=" + username +
|
" --username=" + username +
|
||||||
//" --password=" + password +
|
// " --password=" + password +
|
||||||
// " --compact --skip-opt " +
|
// " --compact --skip-opt " +
|
||||||
" --no-comments" +
|
" --no-comments" +
|
||||||
//" --data-only" +
|
// " --data-only" +
|
||||||
" " + dbname +
|
" " + dbname +
|
||||||
// pg_dump's output has comments that include build info
|
// pg_dump's output has comments that include build info
|
||||||
// of postgres and pg_dump. The build info changes with
|
// of postgres and pg_dump. The build info changes with
|
||||||
|
|
@ -358,7 +358,7 @@ func TestClickHouseIntegration(t *testing.T) {
|
||||||
require.NoError(t, err, "failed to start container")
|
require.NoError(t, err, "failed to start container")
|
||||||
defer container.Terminate()
|
defer container.Terminate()
|
||||||
|
|
||||||
//use the plugin to write to the database
|
// use the plugin to write to the database
|
||||||
// host, port, username, password, dbname
|
// host, port, username, password, dbname
|
||||||
address := fmt.Sprintf("tcp://%v:%v?username=%v&database=%v",
|
address := fmt.Sprintf("tcp://%v:%v?username=%v&database=%v",
|
||||||
container.Address, container.Ports[servicePort], username, dbname)
|
container.Address, container.Ports[servicePort], username, dbname)
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ func TestSqlite(t *testing.T) {
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
require.NoError(t, p.Write(testMetrics))
|
require.NoError(t, p.Write(testMetrics))
|
||||||
|
|
||||||
//read directly from the database
|
// read directly from the database
|
||||||
db, err := gosql.Open("sqlite", address)
|
db, err := gosql.Open("sqlite", address)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
@ -74,7 +74,7 @@ func TestSqlite(t *testing.T) {
|
||||||
// sqlite stores dates as strings. They may be in the local
|
// sqlite stores dates as strings. They may be in the local
|
||||||
// timezone. The test needs to parse them back into a time.Time to
|
// timezone. The test needs to parse them back into a time.Time to
|
||||||
// check them.
|
// check them.
|
||||||
//timeLayout := "2006-01-02 15:04:05 -0700 MST"
|
// timeLayout := "2006-01-02 15:04:05 -0700 MST"
|
||||||
timeLayout := "2006-01-02T15:04:05Z"
|
timeLayout := "2006-01-02T15:04:05Z"
|
||||||
var actualTime time.Time
|
var actualTime time.Time
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,7 @@ func (sm *SyslogMapper) mapAppname(metric telegraf.Metric, msg *rfc5424.SyslogMe
|
||||||
if value, ok := metric.GetTag("appname"); ok {
|
if value, ok := metric.GetTag("appname"); ok {
|
||||||
msg.SetAppname(formatValue(value))
|
msg.SetAppname(formatValue(value))
|
||||||
} else {
|
} else {
|
||||||
//Use default appname
|
// Use default appname
|
||||||
msg.SetAppname(sm.DefaultAppname)
|
msg.SetAppname(sm.DefaultAppname)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
}
|
}
|
||||||
require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table")
|
require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table")
|
||||||
|
|
||||||
//multi-measure config validation multi table mode
|
// multi-measure config validation multi table mode
|
||||||
validConfigMultiMeasureMultiTableMode := Timestream{
|
validConfigMultiMeasureMultiTableMode := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDbName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
|
|
@ -687,7 +687,7 @@ func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
|
||||||
input1 := testutil.MustMetric(
|
input1 := testutil.MustMetric(
|
||||||
metricName1,
|
metricName1,
|
||||||
map[string]string{"tag1": "value1"},
|
map[string]string{"tag1": "value1"},
|
||||||
map[string]interface{}{}, //no fields here
|
map[string]interface{}{}, // no fields here
|
||||||
time1,
|
time1,
|
||||||
)
|
)
|
||||||
input2 := testutil.MustMetric(
|
input2 := testutil.MustMetric(
|
||||||
|
|
@ -700,7 +700,7 @@ func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
|
||||||
)
|
)
|
||||||
input3 := testutil.MustMetric(
|
input3 := testutil.MustMetric(
|
||||||
metricName1,
|
metricName1,
|
||||||
map[string]string{}, //record with no dimensions should appear in the results
|
map[string]string{}, // record with no dimensions should appear in the results
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
"value": float64(20),
|
"value": float64(20),
|
||||||
},
|
},
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue