test(outputs.azure_monitor): Cleanup tests and add a unit-test for time-limit handling (#16429)
This commit is contained in:
parent
25de05a8fc
commit
d125281241
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -14,25 +15,22 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAggregate(t *testing.T) {
|
func TestAggregate(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
plugin *AzureMonitor
|
stringdim bool
|
||||||
metrics []telegraf.Metric
|
metrics []telegraf.Metric
|
||||||
addTime time.Time
|
addTime time.Time
|
||||||
pushTime time.Time
|
pushTime time.Time
|
||||||
check func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric)
|
expected []telegraf.Metric
|
||||||
|
expectedOutsideWindow int64
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "add metric outside window is dropped",
|
name: "add metric outside window is dropped",
|
||||||
plugin: &AzureMonitor{
|
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -43,20 +41,12 @@ func TestAggregate(t *testing.T) {
|
||||||
time.Unix(0, 0),
|
time.Unix(0, 0),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
addTime: time.Unix(3600, 0),
|
addTime: time.Unix(3600, 0),
|
||||||
pushTime: time.Unix(3600, 0),
|
pushTime: time.Unix(3600, 0),
|
||||||
check: func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) {
|
expectedOutsideWindow: 1,
|
||||||
require.Equal(t, int64(1), plugin.MetricOutsideWindow.Get())
|
|
||||||
require.Empty(t, metrics)
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "metric not sent until period expires",
|
name: "metric not sent until period expires",
|
||||||
plugin: &AzureMonitor{
|
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -69,18 +59,10 @@ func TestAggregate(t *testing.T) {
|
||||||
},
|
},
|
||||||
addTime: time.Unix(0, 0),
|
addTime: time.Unix(0, 0),
|
||||||
pushTime: time.Unix(0, 0),
|
pushTime: time.Unix(0, 0),
|
||||||
check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) {
|
|
||||||
require.Empty(t, metrics)
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "add strings as dimensions",
|
name: "add strings as dimensions",
|
||||||
plugin: &AzureMonitor{
|
stringdim: true,
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
StringsAsDimensions: true,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -96,34 +78,25 @@ func TestAggregate(t *testing.T) {
|
||||||
},
|
},
|
||||||
addTime: time.Unix(0, 0),
|
addTime: time.Unix(0, 0),
|
||||||
pushTime: time.Unix(3600, 0),
|
pushTime: time.Unix(3600, 0),
|
||||||
check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) {
|
expected: []telegraf.Metric{
|
||||||
expected := []telegraf.Metric{
|
testutil.MustMetric(
|
||||||
testutil.MustMetric(
|
"cpu-value",
|
||||||
"cpu-value",
|
map[string]string{
|
||||||
map[string]string{
|
"host": "localhost",
|
||||||
"host": "localhost",
|
"message": "howdy",
|
||||||
"message": "howdy",
|
},
|
||||||
},
|
map[string]interface{}{
|
||||||
map[string]interface{}{
|
"min": 42.0,
|
||||||
"min": 42.0,
|
"max": 42.0,
|
||||||
"max": 42.0,
|
"sum": 42.0,
|
||||||
"sum": 42.0,
|
"count": 1,
|
||||||
"count": 1,
|
},
|
||||||
},
|
time.Unix(0, 0),
|
||||||
time.Unix(0, 0),
|
),
|
||||||
),
|
|
||||||
}
|
|
||||||
testutil.RequireMetricsEqual(t, expected, metrics)
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "add metric to cache and push",
|
name: "add metric to cache and push",
|
||||||
plugin: &AzureMonitor{
|
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
cache: make(map[time.Time]map[uint64]*aggregate, 36),
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -136,32 +109,22 @@ func TestAggregate(t *testing.T) {
|
||||||
},
|
},
|
||||||
addTime: time.Unix(0, 0),
|
addTime: time.Unix(0, 0),
|
||||||
pushTime: time.Unix(3600, 0),
|
pushTime: time.Unix(3600, 0),
|
||||||
check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) {
|
expected: []telegraf.Metric{
|
||||||
expected := []telegraf.Metric{
|
testutil.MustMetric(
|
||||||
testutil.MustMetric(
|
"cpu-value",
|
||||||
"cpu-value",
|
map[string]string{},
|
||||||
map[string]string{},
|
map[string]interface{}{
|
||||||
map[string]interface{}{
|
"min": 42.0,
|
||||||
"min": 42.0,
|
"max": 42.0,
|
||||||
"max": 42.0,
|
"sum": 42.0,
|
||||||
"sum": 42.0,
|
"count": 1,
|
||||||
"count": 1,
|
},
|
||||||
},
|
time.Unix(0, 0),
|
||||||
time.Unix(0, 0),
|
),
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
testutil.RequireMetricsEqual(t, expected, metrics)
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "added metric are aggregated",
|
name: "added metric are aggregated",
|
||||||
plugin: &AzureMonitor{
|
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
cache: make(map[time.Time]map[uint64]*aggregate, 36),
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -190,22 +153,18 @@ func TestAggregate(t *testing.T) {
|
||||||
},
|
},
|
||||||
addTime: time.Unix(0, 0),
|
addTime: time.Unix(0, 0),
|
||||||
pushTime: time.Unix(3600, 0),
|
pushTime: time.Unix(3600, 0),
|
||||||
check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) {
|
expected: []telegraf.Metric{
|
||||||
expected := []telegraf.Metric{
|
testutil.MustMetric(
|
||||||
testutil.MustMetric(
|
"cpu-value",
|
||||||
"cpu-value",
|
map[string]string{},
|
||||||
map[string]string{},
|
map[string]interface{}{
|
||||||
map[string]interface{}{
|
"min": 2.0,
|
||||||
"min": 2.0,
|
"max": 84.0,
|
||||||
"max": 84.0,
|
"sum": 128.0,
|
||||||
"sum": 128.0,
|
"count": 3,
|
||||||
"count": 3,
|
},
|
||||||
},
|
time.Unix(0, 0),
|
||||||
time.Unix(0, 0),
|
),
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
testutil.RequireMetricsEqual(t, expected, metrics)
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -213,24 +172,34 @@ func TestAggregate(t *testing.T) {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
msiEndpoint, err := adal.GetMSIVMEndpoint()
|
msiEndpoint, err := adal.GetMSIVMEndpoint()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
t.Setenv("MSI_ENDPOINT", msiEndpoint)
|
t.Setenv("MSI_ENDPOINT", msiEndpoint)
|
||||||
err = tt.plugin.Connect()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Reset globals
|
// Setup plugin
|
||||||
tt.plugin.MetricOutsideWindow.Set(0)
|
plugin := &AzureMonitor{
|
||||||
|
Region: "test",
|
||||||
|
ResourceID: "/test",
|
||||||
|
StringsAsDimensions: tt.stringdim,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
timeFunc: func() time.Time { return tt.addTime },
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
|
||||||
tt.plugin.timeFunc = func() time.Time { return tt.addTime }
|
// Reset statistics
|
||||||
|
plugin.MetricOutsideWindow.Set(0)
|
||||||
|
|
||||||
|
// Add the data
|
||||||
for _, m := range tt.metrics {
|
for _, m := range tt.metrics {
|
||||||
tt.plugin.Add(m)
|
plugin.Add(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
tt.plugin.timeFunc = func() time.Time { return tt.pushTime }
|
// Push out the data at a later time
|
||||||
metrics := tt.plugin.Push()
|
plugin.timeFunc = func() time.Time { return tt.pushTime }
|
||||||
tt.plugin.Reset()
|
metrics := plugin.Push()
|
||||||
|
plugin.Reset()
|
||||||
|
|
||||||
tt.check(t, tt.plugin, metrics)
|
// Check the results
|
||||||
|
require.Equal(t, tt.expectedOutsideWindow, plugin.MetricOutsideWindow.Get())
|
||||||
|
testutil.RequireMetricsEqual(t, tt.expected, metrics)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -243,45 +212,15 @@ func TestWrite(t *testing.T) {
|
||||||
t.Setenv("AZURE_USERNAME", "fake")
|
t.Setenv("AZURE_USERNAME", "fake")
|
||||||
t.Setenv("AZURE_PASSWORD", "fake")
|
t.Setenv("AZURE_PASSWORD", "fake")
|
||||||
|
|
||||||
readBody := func(r *http.Request) ([]*azureMonitorMetric, error) {
|
|
||||||
gz, err := gzip.NewReader(r.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
scanner := bufio.NewScanner(gz)
|
|
||||||
|
|
||||||
azmetrics := make([]*azureMonitorMetric, 0)
|
|
||||||
for scanner.Scan() {
|
|
||||||
line := scanner.Text()
|
|
||||||
var amm azureMonitorMetric
|
|
||||||
err = json.Unmarshal([]byte(line), &amm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
azmetrics = append(azmetrics, &amm)
|
|
||||||
}
|
|
||||||
|
|
||||||
return azmetrics, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := httptest.NewServer(http.NotFoundHandler())
|
|
||||||
defer ts.Close()
|
|
||||||
|
|
||||||
url := "http://" + ts.Listener.Addr().String() + "/metrics"
|
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
plugin *AzureMonitor
|
metrics []telegraf.Metric
|
||||||
metrics []telegraf.Metric
|
expectedCalls uint64
|
||||||
handler func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
expectedMetrics uint64
|
||||||
|
errmsg string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "if not an azure metric nothing is sent",
|
name: "if not an azure metric nothing is sent",
|
||||||
plugin: &AzureMonitor{
|
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -292,17 +231,9 @@ func TestWrite(t *testing.T) {
|
||||||
time.Unix(0, 0),
|
time.Unix(0, 0),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
handler: func(t *testing.T, _ http.ResponseWriter, _ *http.Request) {
|
|
||||||
t.Fatal("should not call")
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "single azure metric",
|
name: "single azure metric",
|
||||||
plugin: &AzureMonitor{
|
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu-value",
|
"cpu-value",
|
||||||
|
|
@ -316,20 +247,11 @@ func TestWrite(t *testing.T) {
|
||||||
time.Unix(0, 0),
|
time.Unix(0, 0),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
expectedCalls: 1,
|
||||||
azmetrics, err := readBody(r)
|
expectedMetrics: 1,
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, azmetrics, 1)
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "multiple azure metric",
|
name: "multiple azure metric",
|
||||||
plugin: &AzureMonitor{
|
|
||||||
Region: "test",
|
|
||||||
ResourceID: "/test",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu-value",
|
"cpu-value",
|
||||||
|
|
@ -354,29 +276,306 @@ func TestWrite(t *testing.T) {
|
||||||
time.Unix(60, 0),
|
time.Unix(60, 0),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
expectedCalls: 1,
|
||||||
azmetrics, err := readBody(r)
|
expectedMetrics: 2,
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, azmetrics, 2)
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
// Setup test server to collect the sent metrics
|
||||||
tt.handler(t, w, r)
|
var calls atomic.Uint64
|
||||||
})
|
var metrics atomic.Uint64
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
calls.Add(1)
|
||||||
|
|
||||||
err := tt.plugin.Connect()
|
gz, err := gzip.NewReader(r.Body)
|
||||||
require.NoError(t, err)
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
// override real authorizer and write url
|
t.Logf("cannot create gzip reader: %v", err)
|
||||||
tt.plugin.auth = autorest.NullAuthorizer{}
|
t.Fail()
|
||||||
tt.plugin.url = url
|
return
|
||||||
|
}
|
||||||
err = tt.plugin.Write(tt.metrics)
|
|
||||||
|
scanner := bufio.NewScanner(gz)
|
||||||
|
for scanner.Scan() {
|
||||||
|
var m azureMonitorMetric
|
||||||
|
if err := json.Unmarshal(scanner.Bytes(), &m); err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
t.Logf("cannot unmarshal JSON: %v", err)
|
||||||
|
t.Fail()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
metrics.Add(1)
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
// Setup the plugin
|
||||||
|
plugin := AzureMonitor{
|
||||||
|
EndpointURL: "http://" + ts.Listener.Addr().String(),
|
||||||
|
Region: "test",
|
||||||
|
ResourceID: "/test",
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
timeFunc: func() time.Time { return time.Unix(120, 0) },
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
|
||||||
|
// Override with testing setup
|
||||||
|
plugin.auth = autorest.NullAuthorizer{}
|
||||||
|
|
||||||
|
err := plugin.Write(tt.metrics)
|
||||||
|
if tt.errmsg != "" {
|
||||||
|
require.ErrorContains(t, err, tt.errmsg)
|
||||||
|
return
|
||||||
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tt.expectedCalls, calls.Load())
|
||||||
|
require.Equal(t, tt.expectedMetrics, metrics.Load())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteTimelimits(t *testing.T) {
|
||||||
|
// Set up a fake environment for Authorizer
|
||||||
|
// This used to fake an MSI environment, but since https://github.com/Azure/go-autorest/pull/670/files it's no longer possible,
|
||||||
|
// So we fake a user/password authentication
|
||||||
|
t.Setenv("AZURE_CLIENT_ID", "fake")
|
||||||
|
t.Setenv("AZURE_USERNAME", "fake")
|
||||||
|
t.Setenv("AZURE_PASSWORD", "fake")
|
||||||
|
|
||||||
|
// Setup input metrics
|
||||||
|
tref := time.Now().Truncate(time.Minute)
|
||||||
|
inputs := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "too old",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(-time.Hour),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "30 min in the past",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(-30*time.Minute),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "20 min in the past",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(-20*time.Minute),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "10 min in the past",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(-10*time.Minute),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "now",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref,
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "1 min in the future",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(1*time.Minute),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "2 min in the future",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(2*time.Minute),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "4 min in the future",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(4*time.Minute),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "5 min in the future",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(5*time.Minute),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu-value",
|
||||||
|
map[string]string{
|
||||||
|
"status": "too far in the future",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": float64(42),
|
||||||
|
"max": float64(42),
|
||||||
|
"sum": float64(42),
|
||||||
|
"count": int64(1),
|
||||||
|
},
|
||||||
|
tref.Add(time.Hour),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error message for status 400
|
||||||
|
msg := `{"error":{"code":"BadRequest","message":"'time' should not be older than 30 minutes and not more than 4 minutes in the future\r\n"}}`
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
input []telegraf.Metric
|
||||||
|
limitPast time.Duration
|
||||||
|
limitFuture time.Duration
|
||||||
|
expectedCount int
|
||||||
|
expectedError string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "only good metrics",
|
||||||
|
input: inputs[1 : len(inputs)-2],
|
||||||
|
limitPast: 48 * time.Hour,
|
||||||
|
limitFuture: 48 * time.Hour,
|
||||||
|
expectedCount: len(inputs) - 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "metrics out of bounds",
|
||||||
|
input: inputs,
|
||||||
|
limitPast: 48 * time.Hour,
|
||||||
|
limitFuture: 48 * time.Hour,
|
||||||
|
expectedCount: len(inputs),
|
||||||
|
expectedError: "400 Bad Request: " + msg,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
// Counter for the number of received metrics
|
||||||
|
var count atomic.Int32
|
||||||
|
|
||||||
|
// Setup test server
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
defer r.Body.Close()
|
||||||
|
|
||||||
|
reader, err := gzip.NewReader(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
t.Logf("unzipping content failed: %v", err)
|
||||||
|
t.Fail()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
status := http.StatusOK
|
||||||
|
scanner := bufio.NewScanner(reader)
|
||||||
|
for scanner.Scan() {
|
||||||
|
var data map[string]interface{}
|
||||||
|
if err := json.Unmarshal(scanner.Bytes(), &data); err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
t.Logf("decoding JSON failed: %v", err)
|
||||||
|
t.Fail()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
timestamp, err := time.Parse(time.RFC3339, data["time"].(string))
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
t.Logf("decoding time failed: %v", err)
|
||||||
|
t.Fail()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if timestamp.Before(tref.Add(-30*time.Minute)) || timestamp.After(tref.Add(5*time.Minute)) {
|
||||||
|
status = http.StatusBadRequest
|
||||||
|
}
|
||||||
|
count.Add(1)
|
||||||
|
}
|
||||||
|
w.WriteHeader(status)
|
||||||
|
if status == 400 {
|
||||||
|
//nolint:errcheck // Ignoring returned error as it is not relevant for the test
|
||||||
|
w.Write([]byte(msg))
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
// Setup plugin
|
||||||
|
plugin := AzureMonitor{
|
||||||
|
EndpointURL: "http://" + ts.Listener.Addr().String(),
|
||||||
|
Region: "test",
|
||||||
|
ResourceID: "/test",
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
timeFunc: func() time.Time { return tref },
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
|
||||||
|
// Override with testing setup
|
||||||
|
plugin.auth = autorest.NullAuthorizer{}
|
||||||
|
|
||||||
|
// Test writing
|
||||||
|
err := plugin.Write(tt.input)
|
||||||
|
if tt.expectedError == "" {
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
require.ErrorContains(t, err, tt.expectedError)
|
||||||
|
}
|
||||||
|
require.Equal(t, tt.expectedCount, int(count.Load()))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue