chore: Fix linter findings for `revive:enforce-slice-style` in `plugins/outputs` (#16032)
This commit is contained in:
parent
aaae84b67b
commit
c4bce2d211
|
|
@ -60,10 +60,9 @@ func (a *Amon) Write(metrics []telegraf.Metric) error {
|
||||||
if len(metrics) == 0 {
|
if len(metrics) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ts := TimeSeries{}
|
|
||||||
tempSeries := []*Metric{}
|
|
||||||
metricCounter := 0
|
|
||||||
|
|
||||||
|
metricCounter := 0
|
||||||
|
tempSeries := make([]*Metric, 0, len(metrics))
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
mname := strings.ReplaceAll(m.Name(), "_", ".")
|
mname := strings.ReplaceAll(m.Name(), "_", ".")
|
||||||
if amonPts, err := buildMetrics(m); err == nil {
|
if amonPts, err := buildMetrics(m); err == nil {
|
||||||
|
|
@ -80,6 +79,7 @@ func (a *Amon) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ts := TimeSeries{}
|
||||||
ts.Series = make([]*Metric, metricCounter)
|
ts.Series = make([]*Metric, metricCounter)
|
||||||
copy(ts.Series, tempSeries[0:])
|
copy(ts.Series, tempSeries[0:])
|
||||||
tsBytes, err := json.Marshal(ts)
|
tsBytes, err := json.Marshal(ts)
|
||||||
|
|
|
||||||
|
|
@ -297,8 +297,7 @@ func TestTimeout(t *testing.T) {
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics := []telegraf.Metric{}
|
err := clfy.Write(nil)
|
||||||
err := clfy.Write(metrics)
|
|
||||||
require.ErrorIs(t, err, errTimeout)
|
require.ErrorIs(t, err, errTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -146,12 +146,12 @@ func TestPartitionDatums(t *testing.T) {
|
||||||
Value: aws.Float64(1),
|
Value: aws.Float64(1),
|
||||||
}
|
}
|
||||||
|
|
||||||
zeroDatum := []types.MetricDatum{}
|
zeroDatum := make([]types.MetricDatum, 0)
|
||||||
oneDatum := []types.MetricDatum{testDatum}
|
oneDatum := []types.MetricDatum{testDatum}
|
||||||
twoDatum := []types.MetricDatum{testDatum, testDatum}
|
twoDatum := []types.MetricDatum{testDatum, testDatum}
|
||||||
threeDatum := []types.MetricDatum{testDatum, testDatum, testDatum}
|
threeDatum := []types.MetricDatum{testDatum, testDatum, testDatum}
|
||||||
|
|
||||||
require.Equal(t, [][]types.MetricDatum{}, PartitionDatums(2, zeroDatum))
|
require.Empty(t, PartitionDatums(2, zeroDatum))
|
||||||
require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
||||||
require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
|
||||||
require.Equal(t, [][]types.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum))
|
require.Equal(t, [][]types.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum))
|
||||||
|
|
|
||||||
|
|
@ -302,7 +302,7 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
lsContainer = val
|
lsContainer = val
|
||||||
} else {
|
} else {
|
||||||
lsContainer.messageBatches[0].messageCount = 0
|
lsContainer.messageBatches[0].messageCount = 0
|
||||||
lsContainer.messageBatches[0].logEvents = []types.InputLogEvent{}
|
lsContainer.messageBatches[0].logEvents = make([]types.InputLogEvent, 0)
|
||||||
c.ls[logStream] = lsContainer
|
c.ls[logStream] = lsContainer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -312,8 +312,9 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
lsContainer.currentBatchIndex++
|
lsContainer.currentBatchIndex++
|
||||||
lsContainer.messageBatches = append(lsContainer.messageBatches,
|
lsContainer.messageBatches = append(lsContainer.messageBatches,
|
||||||
messageBatch{
|
messageBatch{
|
||||||
logEvents: []types.InputLogEvent{},
|
messageCount: 0,
|
||||||
messageCount: 0})
|
},
|
||||||
|
)
|
||||||
lsContainer.currentBatchSizeBytes = messageSizeInBytesForAWS
|
lsContainer.currentBatchSizeBytes = messageSizeInBytesForAWS
|
||||||
} else {
|
} else {
|
||||||
lsContainer.currentBatchSizeBytes += messageSizeInBytesForAWS
|
lsContainer.currentBatchSizeBytes += messageSizeInBytesForAWS
|
||||||
|
|
@ -387,8 +388,8 @@ func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
// Cleanup batch
|
// Cleanup batch
|
||||||
elem.messageBatches[index] = messageBatch{
|
elem.messageBatches[index] = messageBatch{
|
||||||
logEvents: []types.InputLogEvent{},
|
messageCount: 0,
|
||||||
messageCount: 0}
|
}
|
||||||
|
|
||||||
elem.sequenceToken = *putLogEventsOutput.NextSequenceToken
|
elem.sequenceToken = *putLogEventsOutput.NextSequenceToken
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -77,8 +77,7 @@ func (d *Datadog) Connect() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Datadog) convertToDatadogMetric(metrics []telegraf.Metric) []*Metric {
|
func (d *Datadog) convertToDatadogMetric(metrics []telegraf.Metric) []*Metric {
|
||||||
tempSeries := []*Metric{}
|
tempSeries := make([]*Metric, 0, len(metrics))
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
if dogMs, err := buildMetrics(m); err == nil {
|
if dogMs, err := buildMetrics(m); err == nil {
|
||||||
metricTags := buildTags(m.TagList())
|
metricTags := buildTags(m.TagList())
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,7 @@ func TestBuildTags(t *testing.T) {
|
||||||
outTags []string
|
outTags []string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
[]*telegraf.Tag{
|
ptIn: []*telegraf.Tag{
|
||||||
{
|
{
|
||||||
Key: "one",
|
Key: "one",
|
||||||
Value: "two",
|
Value: "two",
|
||||||
|
|
@ -109,20 +109,20 @@ func TestBuildTags(t *testing.T) {
|
||||||
Value: "four",
|
Value: "four",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
[]string{"one:two", "three:four"},
|
outTags: []string{"one:two", "three:four"},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]*telegraf.Tag{
|
ptIn: []*telegraf.Tag{
|
||||||
{
|
{
|
||||||
Key: "aaa",
|
Key: "aaa",
|
||||||
Value: "bbb",
|
Value: "bbb",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
[]string{"aaa:bbb"},
|
outTags: []string{"aaa:bbb"},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]*telegraf.Tag{},
|
ptIn: make([]*telegraf.Tag, 0),
|
||||||
[]string{},
|
outTags: make([]string, 0),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tagtests {
|
for _, tt := range tagtests {
|
||||||
|
|
|
||||||
|
|
@ -67,10 +67,9 @@ func (d *Dynatrace) Write(metrics []telegraf.Metric) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lines := []string{}
|
lines := make([]string, 0, len(metrics))
|
||||||
|
|
||||||
for _, tm := range metrics {
|
for _, tm := range metrics {
|
||||||
dims := []dimensions.Dimension{}
|
dims := make([]dimensions.Dimension, 0, len(tm.TagList()))
|
||||||
for _, tag := range tm.TagList() {
|
for _, tag := range tm.TagList() {
|
||||||
// Ignore special tags for histogram and summary types.
|
// Ignore special tags for histogram and summary types.
|
||||||
switch tm.Type() {
|
switch tm.Type() {
|
||||||
|
|
@ -211,7 +210,7 @@ func (d *Dynatrace) Init() error {
|
||||||
Timeout: time.Duration(d.Timeout),
|
Timeout: time.Duration(d.Timeout),
|
||||||
}
|
}
|
||||||
|
|
||||||
dims := []dimensions.Dimension{}
|
dims := make([]dimensions.Dimension, 0, len(d.DefaultDimensions))
|
||||||
for key, value := range d.DefaultDimensions {
|
for key, value := range d.DefaultDimensions {
|
||||||
dims = append(dims, dimensions.NewDimension(key, value))
|
dims = append(dims, dimensions.NewDimension(key, value))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -66,8 +66,7 @@ func TestEmptyMetricsSlice(t *testing.T) {
|
||||||
|
|
||||||
err = d.Connect()
|
err = d.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
empty := []telegraf.Metric{}
|
err = d.Write(nil)
|
||||||
err = d.Write(empty)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -127,7 +126,7 @@ func TestMissingAPIToken(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendMetrics(t *testing.T) {
|
func TestSendMetrics(t *testing.T) {
|
||||||
expected := []string{}
|
var expected []string
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// check the encoded result
|
// check the encoded result
|
||||||
|
|
@ -152,10 +151,9 @@ func TestSendMetrics(t *testing.T) {
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
d := &Dynatrace{
|
d := &Dynatrace{
|
||||||
URL: ts.URL,
|
URL: ts.URL,
|
||||||
APIToken: config.NewSecret([]byte("123")),
|
APIToken: config.NewSecret([]byte("123")),
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
AddCounterMetrics: []string{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := d.Init()
|
err := d.Init()
|
||||||
|
|
@ -214,7 +212,7 @@ func TestSendMetrics(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSendMetricsWithPatterns(t *testing.T) {
|
func TestSendMetricsWithPatterns(t *testing.T) {
|
||||||
expected := []string{}
|
var expected []string
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// check the encoded result
|
// check the encoded result
|
||||||
|
|
@ -239,11 +237,9 @@ func TestSendMetricsWithPatterns(t *testing.T) {
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
d := &Dynatrace{
|
d := &Dynatrace{
|
||||||
URL: ts.URL,
|
URL: ts.URL,
|
||||||
APIToken: config.NewSecret([]byte("123")),
|
APIToken: config.NewSecret([]byte("123")),
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
AddCounterMetrics: []string{},
|
|
||||||
AddCounterMetricsPatterns: []string{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := d.Init()
|
err := d.Init()
|
||||||
|
|
|
||||||
|
|
@ -425,7 +425,7 @@ func (a *Elasticsearch) createNewTemplate(templatePattern string) (*bytes.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
|
func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
|
||||||
tagKeys := []string{}
|
tagKeys := make([]string, 0)
|
||||||
startTag := strings.Index(indexName, "{{")
|
startTag := strings.Index(indexName, "{{")
|
||||||
|
|
||||||
for startTag >= 0 {
|
for startTag >= 0 {
|
||||||
|
|
@ -464,8 +464,7 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagK
|
||||||
indexName = dateReplacer.Replace(indexName)
|
indexName = dateReplacer.Replace(indexName)
|
||||||
}
|
}
|
||||||
|
|
||||||
tagValues := []interface{}{}
|
tagValues := make([]interface{}, 0, len(tagKeys))
|
||||||
|
|
||||||
for _, key := range tagKeys {
|
for _, key := range tagKeys {
|
||||||
if value, ok := metricTags[key]; ok {
|
if value, ok := metricTags[key]; ok {
|
||||||
tagValues = append(tagValues, value)
|
tagValues = append(tagValues, value)
|
||||||
|
|
|
||||||
|
|
@ -414,41 +414,41 @@ func TestGetTagKeys(t *testing.T) {
|
||||||
ExpectedTagKeys []string
|
ExpectedTagKeys []string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
"indexname",
|
IndexName: "indexname",
|
||||||
"indexname",
|
ExpectedIndexName: "indexname",
|
||||||
[]string{},
|
ExpectedTagKeys: make([]string, 0),
|
||||||
}, {
|
}, {
|
||||||
"indexname-%Y",
|
IndexName: "indexname-%Y",
|
||||||
"indexname-%Y",
|
ExpectedIndexName: "indexname-%Y",
|
||||||
[]string{},
|
ExpectedTagKeys: make([]string, 0),
|
||||||
}, {
|
}, {
|
||||||
"indexname-%Y-%m",
|
IndexName: "indexname-%Y-%m",
|
||||||
"indexname-%Y-%m",
|
ExpectedIndexName: "indexname-%Y-%m",
|
||||||
[]string{},
|
ExpectedTagKeys: make([]string, 0),
|
||||||
}, {
|
}, {
|
||||||
"indexname-%Y-%m-%d",
|
IndexName: "indexname-%Y-%m-%d",
|
||||||
"indexname-%Y-%m-%d",
|
ExpectedIndexName: "indexname-%Y-%m-%d",
|
||||||
[]string{},
|
ExpectedTagKeys: make([]string, 0),
|
||||||
}, {
|
}, {
|
||||||
"indexname-%Y-%m-%d-%H",
|
IndexName: "indexname-%Y-%m-%d-%H",
|
||||||
"indexname-%Y-%m-%d-%H",
|
ExpectedIndexName: "indexname-%Y-%m-%d-%H",
|
||||||
[]string{},
|
ExpectedTagKeys: make([]string, 0),
|
||||||
}, {
|
}, {
|
||||||
"indexname-%y-%m",
|
IndexName: "indexname-%y-%m",
|
||||||
"indexname-%y-%m",
|
ExpectedIndexName: "indexname-%y-%m",
|
||||||
[]string{},
|
ExpectedTagKeys: make([]string, 0),
|
||||||
}, {
|
}, {
|
||||||
"indexname-{{tag1}}-%y-%m",
|
IndexName: "indexname-{{tag1}}-%y-%m",
|
||||||
"indexname-%s-%y-%m",
|
ExpectedIndexName: "indexname-%s-%y-%m",
|
||||||
[]string{"tag1"},
|
ExpectedTagKeys: []string{"tag1"},
|
||||||
}, {
|
}, {
|
||||||
"indexname-{{tag1}}-{{tag2}}-%y-%m",
|
IndexName: "indexname-{{tag1}}-{{tag2}}-%y-%m",
|
||||||
"indexname-%s-%s-%y-%m",
|
ExpectedIndexName: "indexname-%s-%s-%y-%m",
|
||||||
[]string{"tag1", "tag2"},
|
ExpectedTagKeys: []string{"tag1", "tag2"},
|
||||||
}, {
|
}, {
|
||||||
"indexname-{{tag1}}-{{tag2}}-{{tag3}}-%y-%m",
|
IndexName: "indexname-{{tag1}}-{{tag2}}-{{tag3}}-%y-%m",
|
||||||
"indexname-%s-%s-%s-%y-%m",
|
ExpectedIndexName: "indexname-%s-%s-%s-%y-%m",
|
||||||
[]string{"tag1", "tag2", "tag3"},
|
ExpectedTagKeys: []string{"tag1", "tag2", "tag3"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
|
@ -476,74 +476,67 @@ func TestGetIndexName(t *testing.T) {
|
||||||
Expected string
|
Expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
IndexName: "indexname",
|
||||||
"indexname",
|
Expected: "indexname",
|
||||||
"indexname",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
IndexName: "indexname-%Y",
|
||||||
"indexname-%Y",
|
Expected: "indexname-2014",
|
||||||
"indexname-2014",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
IndexName: "indexname-%Y-%m",
|
||||||
"indexname-%Y-%m",
|
Expected: "indexname-2014-12",
|
||||||
"indexname-2014-12",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
IndexName: "indexname-%Y-%m-%d",
|
||||||
"indexname-%Y-%m-%d",
|
Expected: "indexname-2014-12-01",
|
||||||
"indexname-2014-12-01",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
IndexName: "indexname-%Y-%m-%d-%H",
|
||||||
"indexname-%Y-%m-%d-%H",
|
Expected: "indexname-2014-12-01-23",
|
||||||
"indexname-2014-12-01-23",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
IndexName: "indexname-%y-%m",
|
||||||
"indexname-%y-%m",
|
Expected: "indexname-14-12",
|
||||||
"indexname-14-12",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
IndexName: "indexname-%Y-%V",
|
||||||
"indexname-%Y-%V",
|
Expected: "indexname-2014-49",
|
||||||
"indexname-2014-49",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{"tag1"},
|
TagKeys: []string{"tag1"},
|
||||||
"indexname-%s-%y-%m",
|
IndexName: "indexname-%s-%y-%m",
|
||||||
"indexname-value1-14-12",
|
Expected: "indexname-value1-14-12",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{"tag1", "tag2"},
|
TagKeys: []string{"tag1", "tag2"},
|
||||||
"indexname-%s-%s-%y-%m",
|
IndexName: "indexname-%s-%s-%y-%m",
|
||||||
"indexname-value1-value2-14-12",
|
Expected: "indexname-value1-value2-14-12",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{"tag1", "tag2", "tag3"},
|
TagKeys: []string{"tag1", "tag2", "tag3"},
|
||||||
"indexname-%s-%s-%s-%y-%m",
|
IndexName: "indexname-%s-%s-%s-%y-%m",
|
||||||
"indexname-value1-value2-none-14-12",
|
Expected: "indexname-value1-value2-none-14-12",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
|
@ -569,28 +562,24 @@ func TestGetPipelineName(t *testing.T) {
|
||||||
Expected string
|
Expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
Expected: "myDefaultPipeline",
|
||||||
"myDefaultPipeline",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
Expected: "myDefaultPipeline",
|
||||||
"myDefaultPipeline",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
|
Tags: map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
|
||||||
[]string{},
|
Expected: "myOtherPipeline",
|
||||||
"myOtherPipeline",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
Tags: map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
||||||
[]string{},
|
Expected: "pipeline2",
|
||||||
"pipeline2",
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
|
@ -619,70 +608,59 @@ func TestPipelineConfigs(t *testing.T) {
|
||||||
Elastic *Elasticsearch
|
Elastic *Elasticsearch
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
Elastic: &Elasticsearch{
|
||||||
"",
|
|
||||||
&Elasticsearch{
|
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
Elastic: &Elasticsearch{
|
||||||
"",
|
|
||||||
&Elasticsearch{
|
|
||||||
DefaultPipeline: "myDefaultPipeline",
|
DefaultPipeline: "myDefaultPipeline",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
|
Tags: map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
|
||||||
[]string{},
|
Expected: "myDefaultPipeline",
|
||||||
"myDefaultPipeline",
|
Elastic: &Elasticsearch{
|
||||||
&Elasticsearch{
|
|
||||||
UsePipeline: "myDefaultPipeline",
|
UsePipeline: "myDefaultPipeline",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
Tags: map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
||||||
[]string{},
|
Elastic: &Elasticsearch{
|
||||||
"",
|
|
||||||
&Elasticsearch{
|
|
||||||
DefaultPipeline: "myDefaultPipeline",
|
DefaultPipeline: "myDefaultPipeline",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
Tags: map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
||||||
[]string{},
|
Expected: "pipeline2",
|
||||||
"pipeline2",
|
Elastic: &Elasticsearch{
|
||||||
&Elasticsearch{
|
|
||||||
UsePipeline: "{{es-pipeline}}",
|
UsePipeline: "{{es-pipeline}}",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
Tags: map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
|
||||||
[]string{},
|
Expected: "value1-pipeline2",
|
||||||
"value1-pipeline2",
|
Elastic: &Elasticsearch{
|
||||||
&Elasticsearch{
|
|
||||||
UsePipeline: "{{tag1}}-{{es-pipeline}}",
|
UsePipeline: "{{tag1}}-{{es-pipeline}}",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
EventTime: time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
|
||||||
map[string]string{"tag1": "value1"},
|
Tags: map[string]string{"tag1": "value1"},
|
||||||
[]string{},
|
Elastic: &Elasticsearch{
|
||||||
"",
|
|
||||||
&Elasticsearch{
|
|
||||||
UsePipeline: "{{es-pipeline}}",
|
UsePipeline: "{{es-pipeline}}",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -128,7 +128,6 @@ func TestExec(t *testing.T) {
|
||||||
name: "test no metrics output",
|
name: "test no metrics output",
|
||||||
command: []string{"tee"},
|
command: []string{"tee"},
|
||||||
err: false,
|
err: false,
|
||||||
metrics: []telegraf.Metric{},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -377,9 +377,10 @@ func (g *Graylog) connectRetry(tlsCfg *tls.Config) {
|
||||||
|
|
||||||
g.wg.Add(1)
|
g.wg.Add(1)
|
||||||
|
|
||||||
unconnected := append([]string{}, g.Servers...)
|
servers := make([]string, 0, len(g.Servers))
|
||||||
|
servers = append(servers, g.Servers...)
|
||||||
for {
|
for {
|
||||||
unconnected, gelfs := g.connectEndpoints(unconnected, tlsCfg)
|
unconnected, gelfs := g.connectEndpoints(servers, tlsCfg)
|
||||||
for _, w := range gelfs {
|
for _, w := range gelfs {
|
||||||
writers = append(writers, w)
|
writers = append(writers, w)
|
||||||
closers = append(closers, w)
|
closers = append(closers, w)
|
||||||
|
|
@ -467,8 +468,6 @@ func (g *Graylog) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
|
func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
|
||||||
out := []string{}
|
|
||||||
|
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
m["version"] = "1.1"
|
m["version"] = "1.1"
|
||||||
m["timestamp"] = float64(metric.Time().UnixNano()) / 1_000_000_000
|
m["timestamp"] = float64(metric.Time().UnixNano()) / 1_000_000_000
|
||||||
|
|
@ -484,7 +483,7 @@ func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
|
||||||
} else {
|
} else {
|
||||||
host, err := os.Hostname()
|
host, err := os.Hostname()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []string{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
m["host"] = host
|
m["host"] = host
|
||||||
}
|
}
|
||||||
|
|
@ -513,11 +512,10 @@ func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
|
||||||
|
|
||||||
serialized, err := ejson.Marshal(m)
|
serialized, err := ejson.Marshal(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []string{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out = append(out, string(serialized))
|
|
||||||
|
|
||||||
return out, nil
|
return []string{string(serialized)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func fieldInSpec(field string) bool {
|
func fieldInSpec(field string) bool {
|
||||||
|
|
|
||||||
|
|
@ -180,7 +180,7 @@ func TestMetricConversionToRecordsWithTags(t *testing.T) {
|
||||||
),
|
),
|
||||||
newMetricWithOrderedFields(
|
newMetricWithOrderedFields(
|
||||||
"root.computer.keyboard",
|
"root.computer.keyboard",
|
||||||
[]telegraf.Tag{},
|
nil,
|
||||||
[]telegraf.Field{
|
[]telegraf.Field{
|
||||||
{Key: "temperature", Value: float64(30.33)},
|
{Key: "temperature", Value: float64(30.33)},
|
||||||
{Key: "counter", Value: int64(123456789)},
|
{Key: "counter", Value: int64(123456789)},
|
||||||
|
|
@ -206,7 +206,7 @@ func TestMetricConversionToRecordsWithTags(t *testing.T) {
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
newMetricWithOrderedFields(
|
newMetricWithOrderedFields(
|
||||||
"root.computer.uint_to_text",
|
"root.computer.uint_to_text",
|
||||||
[]telegraf.Tag{},
|
nil,
|
||||||
[]telegraf.Field{
|
[]telegraf.Field{
|
||||||
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
||||||
},
|
},
|
||||||
|
|
@ -227,7 +227,7 @@ func TestMetricConversionToRecordsWithTags(t *testing.T) {
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
newMetricWithOrderedFields(
|
newMetricWithOrderedFields(
|
||||||
"root.computer.overflow",
|
"root.computer.overflow",
|
||||||
[]telegraf.Tag{},
|
nil,
|
||||||
[]telegraf.Field{
|
[]telegraf.Field{
|
||||||
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
||||||
},
|
},
|
||||||
|
|
@ -248,7 +248,7 @@ func TestMetricConversionToRecordsWithTags(t *testing.T) {
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
newMetricWithOrderedFields(
|
newMetricWithOrderedFields(
|
||||||
"root.computer.second",
|
"root.computer.second",
|
||||||
[]telegraf.Tag{},
|
nil,
|
||||||
[]telegraf.Field{
|
[]telegraf.Field{
|
||||||
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
||||||
},
|
},
|
||||||
|
|
@ -320,10 +320,9 @@ func TestTagSanitization(t *testing.T) {
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
tt.plugin.Log = &testutil.Logger{}
|
tt.plugin.Log = &testutil.Logger{}
|
||||||
actuals := []string{}
|
|
||||||
|
|
||||||
require.NoError(t, tt.plugin.Init())
|
require.NoError(t, tt.plugin.Init())
|
||||||
|
|
||||||
|
actuals := make([]string, 0, len(tt.input))
|
||||||
for _, input := range tt.input {
|
for _, input := range tt.input {
|
||||||
//nolint:errcheck // error cases handled by expected vs actual comparison
|
//nolint:errcheck // error cases handled by expected vs actual comparison
|
||||||
actual, _ := tt.plugin.validateTag(input)
|
actual, _ := tt.plugin.validateTag(input)
|
||||||
|
|
@ -591,7 +590,7 @@ func TestIntegrationInserts(t *testing.T) {
|
||||||
metrics := []telegraf.Metric{
|
metrics := []telegraf.Metric{
|
||||||
newMetricWithOrderedFields(
|
newMetricWithOrderedFields(
|
||||||
"root.computer.unsigned_big",
|
"root.computer.unsigned_big",
|
||||||
[]telegraf.Tag{},
|
nil,
|
||||||
[]telegraf.Field{
|
[]telegraf.Field{
|
||||||
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
|
||||||
},
|
},
|
||||||
|
|
@ -623,7 +622,7 @@ func TestIntegrationInserts(t *testing.T) {
|
||||||
),
|
),
|
||||||
newMetricWithOrderedFields(
|
newMetricWithOrderedFields(
|
||||||
"root.computer.keyboard",
|
"root.computer.keyboard",
|
||||||
[]telegraf.Tag{},
|
nil,
|
||||||
[]telegraf.Field{
|
[]telegraf.Field{
|
||||||
{Key: "temperature", Value: float64(30.33)},
|
{Key: "temperature", Value: float64(30.33)},
|
||||||
{Key: "counter", Value: int64(123456789)},
|
{Key: "counter", Value: int64(123456789)},
|
||||||
|
|
|
||||||
|
|
@ -157,8 +157,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
r := []types.PutRecordsRequestEntry{}
|
r := make([]types.PutRecordsRequestEntry, 0, len(metrics))
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
sz++
|
sz++
|
||||||
|
|
||||||
|
|
@ -176,7 +175,6 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
r = append(r, d)
|
r = append(r, d)
|
||||||
|
|
||||||
if sz == maxRecordsPerRequest {
|
if sz == maxRecordsPerRequest {
|
||||||
elapsed := k.writeKinesis(r)
|
elapsed := k.writeKinesis(r)
|
||||||
k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
|
k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
|
||||||
|
|
|
||||||
|
|
@ -182,7 +182,6 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
|
||||||
records := []types.PutRecordsRequestEntry{
|
records := []types.PutRecordsRequestEntry{
|
||||||
{
|
{
|
||||||
PartitionKey: aws.String(testPartitionKey),
|
PartitionKey: aws.String(testPartitionKey),
|
||||||
Data: []byte{},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -225,10 +224,10 @@ func TestWrite_NoMetrics(t *testing.T) {
|
||||||
svc: svc,
|
svc: svc,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := k.Write([]telegraf.Metric{})
|
err := k.Write(nil)
|
||||||
require.NoError(t, err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{})
|
svc.AssertRequests(t, make([]*kinesis.PutRecordsInput, 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_SingleMetric(t *testing.T) {
|
func TestWrite_SingleMetric(t *testing.T) {
|
||||||
|
|
@ -480,12 +479,8 @@ func (m *mockKinesisPutRecords) SetupResponse(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockKinesisPutRecords) SetupGenericResponse(
|
func (m *mockKinesisPutRecords) SetupGenericResponse(successfulRecordCount uint32, failedRecordCount int32) {
|
||||||
successfulRecordCount uint32,
|
records := make([]types.PutRecordsResultEntry, 0, int32(successfulRecordCount)+failedRecordCount)
|
||||||
failedRecordCount int32,
|
|
||||||
) {
|
|
||||||
records := []types.PutRecordsResultEntry{}
|
|
||||||
|
|
||||||
for i := uint32(0); i < successfulRecordCount; i++ {
|
for i := uint32(0); i < successfulRecordCount; i++ {
|
||||||
records = append(records, types.PutRecordsResultEntry{
|
records = append(records, types.PutRecordsResultEntry{
|
||||||
SequenceNumber: aws.String(testSequenceNumber),
|
SequenceNumber: aws.String(testSequenceNumber),
|
||||||
|
|
|
||||||
|
|
@ -91,8 +91,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
||||||
l.Template = l.SourceTag
|
l.Template = l.SourceTag
|
||||||
}
|
}
|
||||||
|
|
||||||
tempGauges := []*Gauge{}
|
var tempGauges []*Gauge
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
if gauges, err := l.buildGauges(m); err == nil {
|
if gauges, err := l.buildGauges(m); err == nil {
|
||||||
for _, gauge := range gauges {
|
for _, gauge := range gauges {
|
||||||
|
|
@ -180,17 +179,16 @@ func (l *Librato) writeBatch(start, sizeBatch, metricCounter int, tempGauges []*
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
||||||
gauges := []*Gauge{}
|
|
||||||
if m.Time().Unix() == 0 {
|
if m.Time().Unix() == 0 {
|
||||||
return gauges, fmt.Errorf("time was zero %s", m.Name())
|
return nil, fmt.Errorf("time was zero %s", m.Name())
|
||||||
}
|
}
|
||||||
metricSource := graphite.InsertField(
|
|
||||||
graphite.SerializeBucketName("", m.Tags(), l.Template, ""),
|
metricSource := graphite.InsertField(graphite.SerializeBucketName("", m.Tags(), l.Template, ""), "value")
|
||||||
"value")
|
|
||||||
if metricSource == "" {
|
if metricSource == "" {
|
||||||
return gauges,
|
return nil, fmt.Errorf("undeterminable Source type from Field, %s", l.Template)
|
||||||
fmt.Errorf("undeterminable Source type from Field, %s", l.Template)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gauges := make([]*Gauge, 0, len(m.Fields()))
|
||||||
for fieldName, value := range m.Fields() {
|
for fieldName, value := range m.Fields() {
|
||||||
metricName := m.Name()
|
metricName := m.Name()
|
||||||
if fieldName != "value" {
|
if fieldName != "value" {
|
||||||
|
|
@ -206,7 +204,7 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := gauge.setValue(value); err != nil {
|
if err := gauge.setValue(value); err != nil {
|
||||||
return gauges, fmt.Errorf("unable to extract value from Fields: %w", err)
|
return nil, fmt.Errorf("unable to extract value from Fields: %w", err)
|
||||||
}
|
}
|
||||||
gauges = append(gauges, gauge)
|
gauges = append(gauges, gauge)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -110,28 +110,22 @@ func TestGetPipelineName(t *testing.T) {
|
||||||
Expected string
|
Expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
UsePipeline: `{{.Tag "es-pipeline"}}`,
|
||||||
`{{.Tag "es-pipeline"}}`,
|
Expected: "myDefaultPipeline",
|
||||||
"myDefaultPipeline",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||||
[]string{},
|
|
||||||
``,
|
|
||||||
"",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
|
Tags: map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
|
||||||
[]string{},
|
UsePipeline: `{{.Tag "es-pipeline"}}`,
|
||||||
`{{.Tag "es-pipeline"}}`,
|
Expected: "myOtherPipeline",
|
||||||
"myOtherPipeline",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
map[string]string{"tag1": "pipeline2", "es-pipeline": "myOtherPipeline"},
|
Tags: map[string]string{"tag1": "pipeline2", "es-pipeline": "myOtherPipeline"},
|
||||||
[]string{},
|
UsePipeline: `{{.Tag "tag1"}}`,
|
||||||
`{{.Tag "tag1"}}`,
|
Expected: "pipeline2",
|
||||||
"pipeline2",
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
|
|
||||||
|
|
@ -125,7 +125,7 @@ func (o *OpenTelemetry) Close() error {
|
||||||
// Split metrics up by timestamp and send to Google Cloud Stackdriver
|
// Split metrics up by timestamp and send to Google Cloud Stackdriver
|
||||||
func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error {
|
func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error {
|
||||||
metricBatch := make(map[int64][]telegraf.Metric)
|
metricBatch := make(map[int64][]telegraf.Metric)
|
||||||
timestamps := []int64{}
|
timestamps := make([]int64, 0, len(metrics))
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
timestamp := metric.Time().UnixNano()
|
timestamp := metric.Time().UnixNano()
|
||||||
if existingSlice, ok := metricBatch[timestamp]; ok {
|
if existingSlice, ok := metricBatch[timestamp]; ok {
|
||||||
|
|
|
||||||
|
|
@ -324,37 +324,22 @@ func (cols Columns) Keys() Columns {
|
||||||
//
|
//
|
||||||
// Columns are sorted so that they are in order as: [Time, Tags, Fields], with the columns within each group sorted alphabetically.
|
// Columns are sorted so that they are in order as: [Time, Tags, Fields], with the columns within each group sorted alphabetically.
|
||||||
func (cols Columns) Sorted() Columns {
|
func (cols Columns) Sorted() Columns {
|
||||||
newCols := append([]Column{}, cols...)
|
newCols := make(Columns, 0, len(cols))
|
||||||
|
newCols = append(newCols, cols...)
|
||||||
(*utils.ColumnList)(unsafe.Pointer(&newCols)).Sort() //nolint:gosec // G103: Valid use of unsafe call to speed up sorting
|
(*utils.ColumnList)(unsafe.Pointer(&newCols)).Sort() //nolint:gosec // G103: Valid use of unsafe call to speed up sorting
|
||||||
return newCols
|
return newCols
|
||||||
}
|
}
|
||||||
|
|
||||||
// Concat returns a copy of Columns with the given tcsList appended to the end.
|
// Concat returns a copy of Columns with the given tcsList appended to the end.
|
||||||
func (cols Columns) Concat(tcsList ...Columns) Columns {
|
func (cols Columns) Concat(tcsList ...Columns) Columns {
|
||||||
tcsNew := append(Columns{}, cols...)
|
tcsNew := make(Columns, 0, len(cols)+len(tcsList))
|
||||||
|
tcsNew = append(tcsNew, cols...)
|
||||||
for _, tcs := range tcsList {
|
for _, tcs := range tcsList {
|
||||||
tcsNew = append(tcsNew, tcs...)
|
tcsNew = append(tcsNew, tcs...)
|
||||||
}
|
}
|
||||||
return tcsNew
|
return tcsNew
|
||||||
}
|
}
|
||||||
|
|
||||||
// Union generates a list of SQL selectors against the given columns.
|
|
||||||
//
|
|
||||||
// For each column in tcs, if the column also exist in tcsFrom, it will be selected. If the column does not exist NULL will be selected.
|
|
||||||
func (cols Columns) Union(tcsFrom Columns) Columns {
|
|
||||||
tcsNew := append(Columns{}, cols...)
|
|
||||||
TCS:
|
|
||||||
for i, tc := range cols {
|
|
||||||
for _, tcFrom := range tcsFrom {
|
|
||||||
if tc.Name == tcFrom.Name {
|
|
||||||
continue TCS
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tcsNew[i].Type = ""
|
|
||||||
}
|
|
||||||
return tcsNew
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tags returns a Columns list of the columns which are tags.
|
// Tags returns a Columns list of the columns which are tags.
|
||||||
func (cols Columns) Tags() Columns {
|
func (cols Columns) Tags() Columns {
|
||||||
var newCols []Column
|
var newCols []Column
|
||||||
|
|
|
||||||
|
|
@ -317,7 +317,7 @@ func TestTableManagerIntegration_noAlterMissingTag(t *testing.T) {
|
||||||
|
|
||||||
p, err := newPostgresqlTest(t)
|
p, err := newPostgresqlTest(t)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
p.AddColumnTemplates = []*sqltemplate.Template{}
|
p.AddColumnTemplates = make([]*sqltemplate.Template, 0)
|
||||||
require.NoError(t, p.Connect())
|
require.NoError(t, p.Connect())
|
||||||
|
|
||||||
metrics := []telegraf.Metric{
|
metrics := []telegraf.Metric{
|
||||||
|
|
@ -345,7 +345,7 @@ func TestTableManagerIntegration_noAlterMissingTagTableTag(t *testing.T) {
|
||||||
p, err := newPostgresqlTest(t)
|
p, err := newPostgresqlTest(t)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
p.TagsAsForeignKeys = true
|
p.TagsAsForeignKeys = true
|
||||||
p.TagTableAddColumnTemplates = []*sqltemplate.Template{}
|
p.TagTableAddColumnTemplates = make([]*sqltemplate.Template, 0)
|
||||||
require.NoError(t, p.Connect())
|
require.NoError(t, p.Connect())
|
||||||
|
|
||||||
metrics := []telegraf.Metric{
|
metrics := []telegraf.Metric{
|
||||||
|
|
@ -403,7 +403,7 @@ func TestTableManagerIntegration_noAlterMissingField(t *testing.T) {
|
||||||
|
|
||||||
p, err := newPostgresqlTest(t)
|
p, err := newPostgresqlTest(t)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
p.AddColumnTemplates = []*sqltemplate.Template{}
|
p.AddColumnTemplates = make([]*sqltemplate.Template, 0)
|
||||||
require.NoError(t, p.Connect())
|
require.NoError(t, p.Connect())
|
||||||
|
|
||||||
metrics := []telegraf.Metric{
|
metrics := []telegraf.Metric{
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event {
|
func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event {
|
||||||
events := []*raidman.Event{}
|
events := make([]*raidman.Event, 0, len(m.Fields()))
|
||||||
for fieldName, value := range m.Fields() {
|
for fieldName, value := range m.Fields() {
|
||||||
// get host for Riemann event
|
// get host for Riemann event
|
||||||
host, ok := m.Tags()["host"]
|
host, ok := m.Tags()["host"]
|
||||||
|
|
|
||||||
|
|
@ -326,7 +326,7 @@ func (s *Sensu) encodeToJSON(metricPoints []*outputMetric) ([]byte, error) {
|
||||||
|
|
||||||
check, err := s.getCheck(metricPoints)
|
check, err := s.getCheck(metricPoints)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return make([]byte, 0), err
|
||||||
}
|
}
|
||||||
|
|
||||||
output, err := json.Marshal(&outputEvent{
|
output, err := json.Marshal(&outputEvent{
|
||||||
|
|
@ -390,7 +390,7 @@ func (s *Sensu) getCheck(metricPoints []*outputMetric) (*outputCheck, error) {
|
||||||
|
|
||||||
func (s *Sensu) getHandlers() []string {
|
func (s *Sensu) getHandlers() []string {
|
||||||
if s.Metrics == nil || s.Metrics.Handlers == nil {
|
if s.Metrics == nil || s.Metrics.Handlers == nil {
|
||||||
return []string{}
|
return make([]string, 0)
|
||||||
}
|
}
|
||||||
return s.Metrics.Handlers
|
return s.Metrics.Handlers
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,22 +18,22 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type sink struct {
|
type sink struct {
|
||||||
dps []*datapoint.Datapoint
|
datapoints []*datapoint.Datapoint
|
||||||
evs []*event.Event
|
events []*event.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sink) AddDatapoints(_ context.Context, points []*datapoint.Datapoint) error {
|
func (s *sink) AddDatapoints(_ context.Context, points []*datapoint.Datapoint) error {
|
||||||
s.dps = append(s.dps, points...)
|
s.datapoints = append(s.datapoints, points...)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (s *sink) AddEvents(_ context.Context, events []*event.Event) error {
|
func (s *sink) AddEvents(_ context.Context, events []*event.Event) error {
|
||||||
s.evs = append(s.evs, events...)
|
s.events = append(s.events, events...)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type errorsink struct {
|
type errorsink struct {
|
||||||
dps []*datapoint.Datapoint
|
datapoints []*datapoint.Datapoint
|
||||||
evs []*event.Event
|
events []*event.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *errorsink) AddDatapoints(_ context.Context, _ []*datapoint.Datapoint) error {
|
func (e *errorsink) AddDatapoints(_ context.Context, _ []*datapoint.Datapoint) error {
|
||||||
|
|
@ -42,6 +42,7 @@ func (e *errorsink) AddDatapoints(_ context.Context, _ []*datapoint.Datapoint) e
|
||||||
func (e *errorsink) AddEvents(_ context.Context, _ []*event.Event) error {
|
func (e *errorsink) AddEvents(_ context.Context, _ []*event.Event) error {
|
||||||
return errors.New("not sending events")
|
return errors.New("not sending events")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSignalFx_SignalFx(t *testing.T) {
|
func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
type measurement struct {
|
type measurement struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -53,15 +54,11 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
type fields struct {
|
type fields struct {
|
||||||
IncludedEvents []string
|
IncludedEvents []string
|
||||||
}
|
}
|
||||||
type want struct {
|
|
||||||
datapoints []*datapoint.Datapoint
|
|
||||||
events []*event.Event
|
|
||||||
}
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
fields fields
|
fields fields
|
||||||
measurements []*measurement
|
measurements []*measurement
|
||||||
want want
|
want errorsink
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "add datapoints of all types",
|
name: "add datapoints of all types",
|
||||||
|
|
@ -121,7 +118,7 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: errorsink{
|
||||||
datapoints: []*datapoint.Datapoint{
|
datapoints: []*datapoint.Datapoint{
|
||||||
datapoint.New(
|
datapoint.New(
|
||||||
"datapoint.mymeasurement",
|
"datapoint.mymeasurement",
|
||||||
|
|
@ -188,7 +185,7 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
datapoint.Gauge,
|
datapoint.Gauge,
|
||||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)),
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)),
|
||||||
},
|
},
|
||||||
events: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -239,8 +236,8 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
time: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: errorsink{
|
||||||
datapoints: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
events: []*event.Event{
|
events: []*event.Event{
|
||||||
event.NewWithProperties(
|
event.NewWithProperties(
|
||||||
"event.mymeasurement",
|
"event.mymeasurement",
|
||||||
|
|
@ -317,9 +314,9 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
tp: telegraf.Gauge,
|
tp: telegraf.Gauge,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: errorsink{
|
||||||
datapoints: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
events: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -334,7 +331,7 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
tp: telegraf.Gauge,
|
tp: telegraf.Gauge,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: errorsink{
|
||||||
datapoints: []*datapoint.Datapoint{
|
datapoints: []*datapoint.Datapoint{
|
||||||
datapoint.New(
|
datapoint.New(
|
||||||
"datapoint",
|
"datapoint",
|
||||||
|
|
@ -345,7 +342,7 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
datapoint.Gauge,
|
datapoint.Gauge,
|
||||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)),
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC)),
|
||||||
},
|
},
|
||||||
events: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -362,8 +359,8 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
tp: telegraf.Untyped,
|
tp: telegraf.Untyped,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: errorsink{
|
||||||
datapoints: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
events: []*event.Event{
|
events: []*event.Event{
|
||||||
event.NewWithProperties(
|
event.NewWithProperties(
|
||||||
"event.mymeasurement",
|
"event.mymeasurement",
|
||||||
|
|
@ -390,9 +387,9 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
tp: telegraf.Gauge,
|
tp: telegraf.Gauge,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: errorsink{
|
||||||
datapoints: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
events: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -407,9 +404,9 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
tp: telegraf.Gauge,
|
tp: telegraf.Gauge,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: errorsink{
|
||||||
datapoints: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
events: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -423,30 +420,25 @@ func TestSignalFx_SignalFx(t *testing.T) {
|
||||||
require.NoError(t, s.Connect())
|
require.NoError(t, s.Connect())
|
||||||
|
|
||||||
s.client = &sink{
|
s.client = &sink{
|
||||||
dps: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
evs: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
measurements := []telegraf.Metric{}
|
measurements := make([]telegraf.Metric, 0, len(tt.measurements))
|
||||||
|
|
||||||
for _, measurement := range tt.measurements {
|
for _, measurement := range tt.measurements {
|
||||||
m := metric.New(
|
measurements = append(measurements, metric.New(measurement.name, measurement.tags, measurement.fields, measurement.time, measurement.tp))
|
||||||
measurement.name, measurement.tags, measurement.fields, measurement.time, measurement.tp,
|
|
||||||
)
|
|
||||||
|
|
||||||
measurements = append(measurements, m)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.Write(measurements)
|
err := s.Write(measurements)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Eventually(t, func() bool { return len(s.client.(*sink).dps) == len(tt.want.datapoints) }, 5*time.Second, 10*time.Millisecond)
|
require.Eventually(t, func() bool { return len(s.client.(*sink).datapoints) == len(tt.want.datapoints) }, 5*time.Second, 10*time.Millisecond)
|
||||||
require.Eventually(t, func() bool { return len(s.client.(*sink).evs) == len(tt.want.events) }, 5*time.Second, 10*time.Millisecond)
|
require.Eventually(t, func() bool { return len(s.client.(*sink).events) == len(tt.want.events) }, 5*time.Second, 10*time.Millisecond)
|
||||||
|
|
||||||
if !reflect.DeepEqual(s.client.(*sink).dps, tt.want.datapoints) {
|
if !reflect.DeepEqual(s.client.(*sink).datapoints, tt.want.datapoints) {
|
||||||
t.Errorf("Collected datapoints do not match desired. Collected: %v Desired: %v", s.client.(*sink).dps, tt.want.datapoints)
|
t.Errorf("Collected datapoints do not match desired. Collected: %v Desired: %v", s.client.(*sink).datapoints, tt.want.datapoints)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(s.client.(*sink).evs, tt.want.events) {
|
if !reflect.DeepEqual(s.client.(*sink).events, tt.want.events) {
|
||||||
t.Errorf("Collected events do not match desired. Collected: %v Desired: %v", s.client.(*sink).evs, tt.want.events)
|
t.Errorf("Collected events do not match desired. Collected: %v Desired: %v", s.client.(*sink).events, tt.want.events)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
@ -520,8 +512,8 @@ func TestSignalFx_Errors(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: want{
|
||||||
datapoints: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
events: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -573,8 +565,8 @@ func TestSignalFx_Errors(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: want{
|
want: want{
|
||||||
datapoints: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
events: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -589,8 +581,8 @@ func TestSignalFx_Errors(t *testing.T) {
|
||||||
require.NoError(t, s.Connect())
|
require.NoError(t, s.Connect())
|
||||||
|
|
||||||
s.client = &errorsink{
|
s.client = &errorsink{
|
||||||
dps: []*datapoint.Datapoint{},
|
datapoints: make([]*datapoint.Datapoint, 0),
|
||||||
evs: []*event.Event{},
|
events: make([]*event.Event, 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, measurement := range tt.measurements {
|
for _, measurement := range tt.measurements {
|
||||||
|
|
@ -601,14 +593,14 @@ func TestSignalFx_Errors(t *testing.T) {
|
||||||
err := s.Write([]telegraf.Metric{m})
|
err := s.Write([]telegraf.Metric{m})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
for !(len(s.client.(*errorsink).dps) == len(tt.want.datapoints) && len(s.client.(*errorsink).evs) == len(tt.want.events)) {
|
for !(len(s.client.(*errorsink).datapoints) == len(tt.want.datapoints) && len(s.client.(*errorsink).events) == len(tt.want.events)) {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(s.client.(*errorsink).dps, tt.want.datapoints) {
|
if !reflect.DeepEqual(s.client.(*errorsink).datapoints, tt.want.datapoints) {
|
||||||
t.Errorf("Collected datapoints do not match desired. Collected: %v Desired: %v", s.client.(*errorsink).dps, tt.want.datapoints)
|
t.Errorf("Collected datapoints do not match desired. Collected: %v Desired: %v", s.client.(*errorsink).datapoints, tt.want.datapoints)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(s.client.(*errorsink).evs, tt.want.events) {
|
if !reflect.DeepEqual(s.client.(*errorsink).events, tt.want.events) {
|
||||||
t.Errorf("Collected events do not match desired. Collected: %v Desired: %v", s.client.(*errorsink).evs, tt.want.events)
|
t.Errorf("Collected events do not match desired. Collected: %v Desired: %v", s.client.(*errorsink).events, tt.want.events)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -78,8 +78,7 @@ func TestSocketWriter_unixgram(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSocketWriterStream(t *testing.T, sw *SocketWriter, lconn net.Conn) {
|
func testSocketWriterStream(t *testing.T, sw *SocketWriter, lconn net.Conn) {
|
||||||
metrics := []telegraf.Metric{}
|
metrics := []telegraf.Metric{testutil.TestMetric(1, "test")}
|
||||||
metrics = append(metrics, testutil.TestMetric(1, "test"))
|
|
||||||
mbs1out, err := sw.Serialize(metrics[0])
|
mbs1out, err := sw.Serialize(metrics[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mbs1out, err = sw.encoder.Encode(mbs1out)
|
mbs1out, err = sw.encoder.Encode(mbs1out)
|
||||||
|
|
@ -104,8 +103,7 @@ func testSocketWriterStream(t *testing.T, sw *SocketWriter, lconn net.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSocketWriterPacket(t *testing.T, sw *SocketWriter, lconn net.PacketConn) {
|
func testSocketWriterPacket(t *testing.T, sw *SocketWriter, lconn net.PacketConn) {
|
||||||
metrics := []telegraf.Metric{}
|
metrics := []telegraf.Metric{testutil.TestMetric(1, "test")}
|
||||||
metrics = append(metrics, testutil.TestMetric(1, "test"))
|
|
||||||
mbs1out, err := sw.Serialize(metrics[0])
|
mbs1out, err := sw.Serialize(metrics[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mbs1out, err = sw.encoder.Encode(mbs1out)
|
mbs1out, err = sw.encoder.Encode(mbs1out)
|
||||||
|
|
|
||||||
|
|
@ -188,7 +188,7 @@ func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f []*telegraf.Field, ts *mon
|
||||||
// Split metrics up by timestamp and send to Google Cloud Stackdriver
|
// Split metrics up by timestamp and send to Google Cloud Stackdriver
|
||||||
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
|
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
|
||||||
metricBatch := make(map[int64][]telegraf.Metric)
|
metricBatch := make(map[int64][]telegraf.Metric)
|
||||||
timestamps := []int64{}
|
timestamps := make([]int64, 0, len(metrics))
|
||||||
for _, metric := range sorted(metrics) {
|
for _, metric := range sorted(metrics) {
|
||||||
timestamp := metric.Time().UnixNano()
|
timestamp := metric.Time().UnixNano()
|
||||||
if existingSlice, ok := metricBatch[timestamp]; ok {
|
if existingSlice, ok := metricBatch[timestamp]; ok {
|
||||||
|
|
|
||||||
|
|
@ -129,14 +129,13 @@ func TestSyslogWriteWithUdp(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) {
|
func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) {
|
||||||
metrics := []telegraf.Metric{}
|
|
||||||
m1 := metric.New(
|
m1 := metric.New(
|
||||||
"testmetric",
|
"testmetric",
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
map[string]interface{}{},
|
map[string]interface{}{},
|
||||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC))
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
metrics = append(metrics, m1)
|
metrics := []telegraf.Metric{m1}
|
||||||
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
||||||
|
|
@ -153,14 +152,13 @@ func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) {
|
||||||
|
|
||||||
func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) {
|
func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) {
|
||||||
s.Framing = "non-transparent"
|
s.Framing = "non-transparent"
|
||||||
metrics := []telegraf.Metric{}
|
|
||||||
m1 := metric.New(
|
m1 := metric.New(
|
||||||
"testmetric",
|
"testmetric",
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
map[string]interface{}{},
|
map[string]interface{}{},
|
||||||
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC))
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC))
|
||||||
|
|
||||||
metrics = append(metrics, m1)
|
metrics := []telegraf.Metric{m1}
|
||||||
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
|
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ func TestPartitionRecords(t *testing.T) {
|
||||||
twoDatum := []types.Record{testDatum, testDatum}
|
twoDatum := []types.Record{testDatum, testDatum}
|
||||||
threeDatum := []types.Record{testDatum, testDatum, testDatum}
|
threeDatum := []types.Record{testDatum, testDatum, testDatum}
|
||||||
|
|
||||||
require.Equal(t, [][]types.Record{}, partitionRecords(2, zeroDatum))
|
require.Empty(t, partitionRecords(2, zeroDatum))
|
||||||
require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum))
|
require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum))
|
||||||
require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum))
|
require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum))
|
||||||
require.Equal(t, [][]types.Record{twoDatum}, partitionRecords(2, twoDatum))
|
require.Equal(t, [][]types.Record{twoDatum}, partitionRecords(2, twoDatum))
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,6 @@ func TestAddAndPush(t *testing.T) {
|
||||||
time.Now()),
|
time.Now()),
|
||||||
},
|
},
|
||||||
OperationPush{},
|
OperationPush{},
|
||||||
OperationCheck{},
|
|
||||||
},
|
},
|
||||||
"simple Add, Push and check generated LLD metric": {
|
"simple Add, Push and check generated LLD metric": {
|
||||||
OperationAdd{
|
OperationAdd{
|
||||||
|
|
@ -227,7 +226,6 @@ func TestAddAndPush(t *testing.T) {
|
||||||
time.Now(),
|
time.Now(),
|
||||||
)},
|
)},
|
||||||
OperationPush{},
|
OperationPush{},
|
||||||
OperationCheck{},
|
|
||||||
},
|
},
|
||||||
"after lld_clear_interval, already seen LLDs could be resend": {
|
"after lld_clear_interval, already seen LLDs could be resend": {
|
||||||
OperationAdd{testutil.MustMetric(
|
OperationAdd{testutil.MustMetric(
|
||||||
|
|
@ -252,7 +250,6 @@ func TestAddAndPush(t *testing.T) {
|
||||||
time.Now(),
|
time.Now(),
|
||||||
)},
|
)},
|
||||||
OperationPush{},
|
OperationPush{},
|
||||||
OperationCheck{},
|
|
||||||
OperationCrossClearIntervalTime{}, // The clear of the previous LLD seen is done in the next push
|
OperationCrossClearIntervalTime{}, // The clear of the previous LLD seen is done in the next push
|
||||||
OperationAdd{testutil.MustMetric(
|
OperationAdd{testutil.MustMetric(
|
||||||
"name",
|
"name",
|
||||||
|
|
@ -261,7 +258,6 @@ func TestAddAndPush(t *testing.T) {
|
||||||
time.Now(),
|
time.Now(),
|
||||||
)},
|
)},
|
||||||
OperationPush{},
|
OperationPush{},
|
||||||
OperationCheck{},
|
|
||||||
OperationAdd{testutil.MustMetric(
|
OperationAdd{testutil.MustMetric(
|
||||||
"name",
|
"name",
|
||||||
map[string]string{"host": "hostA", "foo": "bar"},
|
map[string]string{"host": "hostA", "foo": "bar"},
|
||||||
|
|
@ -301,7 +297,6 @@ func TestAddAndPush(t *testing.T) {
|
||||||
time.Now(),
|
time.Now(),
|
||||||
)},
|
)},
|
||||||
OperationPush{},
|
OperationPush{},
|
||||||
OperationCheck{}, // LLD has already been sent for this metric
|
|
||||||
// In this interval between push, the metric is not received
|
// In this interval between push, the metric is not received
|
||||||
OperationCrossClearIntervalTime{}, // The clear of the previous LLD seen is done in the next push
|
OperationCrossClearIntervalTime{}, // The clear of the previous LLD seen is done in the next push
|
||||||
OperationPush{},
|
OperationPush{},
|
||||||
|
|
@ -605,8 +600,7 @@ func TestAddAndPush(t *testing.T) {
|
||||||
current: make(map[uint64]lldInfo),
|
current: make(map[uint64]lldInfo),
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics := []telegraf.Metric{}
|
var metrics []telegraf.Metric
|
||||||
|
|
||||||
for _, op := range test {
|
for _, op := range test {
|
||||||
switch o := (op).(type) {
|
switch o := (op).(type) {
|
||||||
case OperationAdd:
|
case OperationAdd:
|
||||||
|
|
@ -637,7 +631,6 @@ func TestPush(t *testing.T) {
|
||||||
"an empty ReceivedData does not generate any metric": {
|
"an empty ReceivedData does not generate any metric": {
|
||||||
ReceivedData: map[uint64]lldInfo{},
|
ReceivedData: map[uint64]lldInfo{},
|
||||||
PreviousReceivedData: map[uint64]lldInfo{},
|
PreviousReceivedData: map[uint64]lldInfo{},
|
||||||
Metrics: []telegraf.Metric{},
|
|
||||||
},
|
},
|
||||||
"simple one host with one lld with one set of values": {
|
"simple one host with one lld with one set of values": {
|
||||||
ReceivedData: map[uint64]lldInfo{
|
ReceivedData: map[uint64]lldInfo{
|
||||||
|
|
@ -814,7 +807,6 @@ func TestPush(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Metrics: []telegraf.Metric{},
|
|
||||||
},
|
},
|
||||||
"send an empty LLD if one metric has stopped being sent": {
|
"send an empty LLD if one metric has stopped being sent": {
|
||||||
ReceivedData: map[uint64]lldInfo{},
|
ReceivedData: map[uint64]lldInfo{},
|
||||||
|
|
|
||||||
|
|
@ -179,7 +179,6 @@ func TestZabbix(t *testing.T) {
|
||||||
time.Unix(1522082244, 0),
|
time.Unix(1522082244, 0),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
zabbixMetrics: []zabbixRequestData{},
|
|
||||||
},
|
},
|
||||||
"metrics without host tag use the system hostname": {
|
"metrics without host tag use the system hostname": {
|
||||||
telegrafMetrics: []telegraf.Metric{
|
telegrafMetrics: []telegraf.Metric{
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue