From d125281241125dd082b992d1d9b10146308e4dc5 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Mon, 27 Jan 2025 19:35:23 +0100 Subject: [PATCH] test(outputs.azure_monitor): Cleanup tests and add a unit-test for time-limit handling (#16429) --- .../azure_monitor/azure_monitor_test.go | 551 ++++++++++++------ 1 file changed, 375 insertions(+), 176 deletions(-) diff --git a/plugins/outputs/azure_monitor/azure_monitor_test.go b/plugins/outputs/azure_monitor/azure_monitor_test.go index dbdec68da..6df847cd7 100644 --- a/plugins/outputs/azure_monitor/azure_monitor_test.go +++ b/plugins/outputs/azure_monitor/azure_monitor_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -14,25 +15,22 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) func TestAggregate(t *testing.T) { tests := []struct { - name string - plugin *AzureMonitor - metrics []telegraf.Metric - addTime time.Time - pushTime time.Time - check func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) + name string + stringdim bool + metrics []telegraf.Metric + addTime time.Time + pushTime time.Time + expected []telegraf.Metric + expectedOutsideWindow int64 }{ { name: "add metric outside window is dropped", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - }, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu", @@ -43,20 +41,12 @@ func TestAggregate(t *testing.T) { time.Unix(0, 0), ), }, - addTime: time.Unix(3600, 0), - pushTime: time.Unix(3600, 0), - check: func(t *testing.T, plugin *AzureMonitor, metrics []telegraf.Metric) { - require.Equal(t, int64(1), plugin.MetricOutsideWindow.Get()) - require.Empty(t, metrics) - }, + addTime: time.Unix(3600, 0), + pushTime: time.Unix(3600, 0), + expectedOutsideWindow: 1, }, { name: "metric not sent until period expires", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - }, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu", @@ -69,18 +59,10 @@ func TestAggregate(t *testing.T) { }, addTime: time.Unix(0, 0), pushTime: time.Unix(0, 0), - check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) { - require.Empty(t, metrics) - }, }, { - name: "add strings as dimensions", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - StringsAsDimensions: true, - Log: testutil.Logger{}, - }, + name: "add strings as dimensions", + stringdim: true, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu", @@ -96,34 +78,25 @@ func TestAggregate(t *testing.T) { }, addTime: time.Unix(0, 0), pushTime: time.Unix(3600, 0), - check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) { - expected := []telegraf.Metric{ - testutil.MustMetric( - "cpu-value", - map[string]string{ - "host": "localhost", - "message": "howdy", - }, - map[string]interface{}{ - "min": 42.0, - "max": 42.0, - "sum": 42.0, - "count": 1, - }, - time.Unix(0, 0), - ), - } - testutil.RequireMetricsEqual(t, expected, metrics) + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{ + "host": "localhost", + "message": "howdy", + }, + map[string]interface{}{ + "min": 42.0, + "max": 42.0, + "sum": 42.0, + "count": 1, + }, + time.Unix(0, 0), + ), }, }, { name: "add metric to cache and push", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - cache: make(map[time.Time]map[uint64]*aggregate, 36), - }, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu", @@ -136,32 +109,22 @@ func TestAggregate(t *testing.T) { }, addTime: time.Unix(0, 0), pushTime: time.Unix(3600, 0), - check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) { - expected := []telegraf.Metric{ - testutil.MustMetric( - "cpu-value", - map[string]string{}, - map[string]interface{}{ - "min": 42.0, - "max": 42.0, - "sum": 42.0, - "count": 1, - }, - time.Unix(0, 0), - ), - } - - testutil.RequireMetricsEqual(t, expected, metrics) + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{}, + map[string]interface{}{ + "min": 42.0, + "max": 42.0, + "sum": 42.0, + "count": 1, + }, + time.Unix(0, 0), + ), }, }, { name: "added metric are aggregated", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - cache: make(map[time.Time]map[uint64]*aggregate, 36), - }, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu", @@ -190,22 +153,18 @@ func TestAggregate(t *testing.T) { }, addTime: time.Unix(0, 0), pushTime: time.Unix(3600, 0), - check: func(t *testing.T, _ *AzureMonitor, metrics []telegraf.Metric) { - expected := []telegraf.Metric{ - testutil.MustMetric( - "cpu-value", - map[string]string{}, - map[string]interface{}{ - "min": 2.0, - "max": 84.0, - "sum": 128.0, - "count": 3, - }, - time.Unix(0, 0), - ), - } - - testutil.RequireMetricsEqual(t, expected, metrics) + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu-value", + map[string]string{}, + map[string]interface{}{ + "min": 2.0, + "max": 84.0, + "sum": 128.0, + "count": 3, + }, + time.Unix(0, 0), + ), }, }, } @@ -213,24 +172,34 @@ func TestAggregate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { msiEndpoint, err := adal.GetMSIVMEndpoint() require.NoError(t, err) - t.Setenv("MSI_ENDPOINT", msiEndpoint) - err = tt.plugin.Connect() - require.NoError(t, err) - // Reset globals - tt.plugin.MetricOutsideWindow.Set(0) + // Setup plugin + plugin := &AzureMonitor{ + Region: "test", + ResourceID: "/test", + StringsAsDimensions: tt.stringdim, + Log: testutil.Logger{}, + timeFunc: func() time.Time { return tt.addTime }, + } + require.NoError(t, plugin.Connect()) - tt.plugin.timeFunc = func() time.Time { return tt.addTime } + // Reset statistics + plugin.MetricOutsideWindow.Set(0) + + // Add the data for _, m := range tt.metrics { - tt.plugin.Add(m) + plugin.Add(m) } - tt.plugin.timeFunc = func() time.Time { return tt.pushTime } - metrics := tt.plugin.Push() - tt.plugin.Reset() + // Push out the data at a later time + plugin.timeFunc = func() time.Time { return tt.pushTime } + metrics := plugin.Push() + plugin.Reset() - tt.check(t, tt.plugin, metrics) + // Check the results + require.Equal(t, tt.expectedOutsideWindow, plugin.MetricOutsideWindow.Get()) + testutil.RequireMetricsEqual(t, tt.expected, metrics) }) } } @@ -243,45 +212,15 @@ func TestWrite(t *testing.T) { t.Setenv("AZURE_USERNAME", "fake") t.Setenv("AZURE_PASSWORD", "fake") - readBody := func(r *http.Request) ([]*azureMonitorMetric, error) { - gz, err := gzip.NewReader(r.Body) - if err != nil { - return nil, err - } - scanner := bufio.NewScanner(gz) - - azmetrics := make([]*azureMonitorMetric, 0) - for scanner.Scan() { - line := scanner.Text() - var amm azureMonitorMetric - err = json.Unmarshal([]byte(line), &amm) - if err != nil { - return nil, err - } - azmetrics = append(azmetrics, &amm) - } - - return azmetrics, nil - } - - ts := httptest.NewServer(http.NotFoundHandler()) - defer ts.Close() - - url := "http://" + ts.Listener.Addr().String() + "/metrics" - tests := []struct { - name string - plugin *AzureMonitor - metrics []telegraf.Metric - handler func(t *testing.T, w http.ResponseWriter, r *http.Request) + name string + metrics []telegraf.Metric + expectedCalls uint64 + expectedMetrics uint64 + errmsg string }{ { name: "if not an azure metric nothing is sent", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - }, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu", @@ -292,17 +231,9 @@ func TestWrite(t *testing.T) { time.Unix(0, 0), ), }, - handler: func(t *testing.T, _ http.ResponseWriter, _ *http.Request) { - t.Fatal("should not call") - }, }, { name: "single azure metric", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - }, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu-value", @@ -316,20 +247,11 @@ func TestWrite(t *testing.T) { time.Unix(0, 0), ), }, - handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { - azmetrics, err := readBody(r) - require.NoError(t, err) - require.Len(t, azmetrics, 1) - w.WriteHeader(http.StatusOK) - }, + expectedCalls: 1, + expectedMetrics: 1, }, { name: "multiple azure metric", - plugin: &AzureMonitor{ - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - }, metrics: []telegraf.Metric{ testutil.MustMetric( "cpu-value", @@ -354,29 +276,306 @@ func TestWrite(t *testing.T) { time.Unix(60, 0), ), }, - handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { - azmetrics, err := readBody(r) - require.NoError(t, err) - require.Len(t, azmetrics, 2) - w.WriteHeader(http.StatusOK) - }, + expectedCalls: 1, + expectedMetrics: 2, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tt.handler(t, w, r) - }) + // Setup test server to collect the sent metrics + var calls atomic.Uint64 + var metrics atomic.Uint64 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Add(1) - err := tt.plugin.Connect() - require.NoError(t, err) - - // override real authorizer and write url - tt.plugin.auth = autorest.NullAuthorizer{} - tt.plugin.url = url - - err = tt.plugin.Write(tt.metrics) + gz, err := gzip.NewReader(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + t.Logf("cannot create gzip reader: %v", err) + t.Fail() + return + } + + scanner := bufio.NewScanner(gz) + for scanner.Scan() { + var m azureMonitorMetric + if err := json.Unmarshal(scanner.Bytes(), &m); err != nil { + w.WriteHeader(http.StatusInternalServerError) + t.Logf("cannot unmarshal JSON: %v", err) + t.Fail() + return + } + metrics.Add(1) + } + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + // Setup the plugin + plugin := AzureMonitor{ + EndpointURL: "http://" + ts.Listener.Addr().String(), + Region: "test", + ResourceID: "/test", + Log: testutil.Logger{}, + timeFunc: func() time.Time { return time.Unix(120, 0) }, + } + require.NoError(t, plugin.Connect()) + + // Override with testing setup + plugin.auth = autorest.NullAuthorizer{} + + err := plugin.Write(tt.metrics) + if tt.errmsg != "" { + require.ErrorContains(t, err, tt.errmsg) + return + } require.NoError(t, err) + require.Equal(t, tt.expectedCalls, calls.Load()) + require.Equal(t, tt.expectedMetrics, metrics.Load()) + }) + } +} + +func TestWriteTimelimits(t *testing.T) { + // Set up a fake environment for Authorizer + // This used to fake an MSI environment, but since https://github.com/Azure/go-autorest/pull/670/files it's no longer possible, + // So we fake a user/password authentication + t.Setenv("AZURE_CLIENT_ID", "fake") + t.Setenv("AZURE_USERNAME", "fake") + t.Setenv("AZURE_PASSWORD", "fake") + + // Setup input metrics + tref := time.Now().Truncate(time.Minute) + inputs := []telegraf.Metric{ + metric.New( + "cpu-value", + map[string]string{ + "status": "too old", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(-time.Hour), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "30 min in the past", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(-30*time.Minute), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "20 min in the past", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(-20*time.Minute), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "10 min in the past", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(-10*time.Minute), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "now", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref, + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "1 min in the future", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(1*time.Minute), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "2 min in the future", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(2*time.Minute), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "4 min in the future", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(4*time.Minute), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "5 min in the future", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(5*time.Minute), + ), + metric.New( + "cpu-value", + map[string]string{ + "status": "too far in the future", + }, + map[string]interface{}{ + "min": float64(42), + "max": float64(42), + "sum": float64(42), + "count": int64(1), + }, + tref.Add(time.Hour), + ), + } + + // Error message for status 400 + msg := `{"error":{"code":"BadRequest","message":"'time' should not be older than 30 minutes and not more than 4 minutes in the future\r\n"}}` + + tests := []struct { + name string + input []telegraf.Metric + limitPast time.Duration + limitFuture time.Duration + expectedCount int + expectedError string + }{ + { + name: "only good metrics", + input: inputs[1 : len(inputs)-2], + limitPast: 48 * time.Hour, + limitFuture: 48 * time.Hour, + expectedCount: len(inputs) - 3, + }, + { + name: "metrics out of bounds", + input: inputs, + limitPast: 48 * time.Hour, + limitFuture: 48 * time.Hour, + expectedCount: len(inputs), + expectedError: "400 Bad Request: " + msg, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Counter for the number of received metrics + var count atomic.Int32 + + // Setup test server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + reader, err := gzip.NewReader(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + t.Logf("unzipping content failed: %v", err) + t.Fail() + return + } + defer reader.Close() + + status := http.StatusOK + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + var data map[string]interface{} + if err := json.Unmarshal(scanner.Bytes(), &data); err != nil { + w.WriteHeader(http.StatusInternalServerError) + t.Logf("decoding JSON failed: %v", err) + t.Fail() + return + } + + timestamp, err := time.Parse(time.RFC3339, data["time"].(string)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + t.Logf("decoding time failed: %v", err) + t.Fail() + return + } + if timestamp.Before(tref.Add(-30*time.Minute)) || timestamp.After(tref.Add(5*time.Minute)) { + status = http.StatusBadRequest + } + count.Add(1) + } + w.WriteHeader(status) + if status == 400 { + //nolint:errcheck // Ignoring returned error as it is not relevant for the test + w.Write([]byte(msg)) + } + })) + defer ts.Close() + + // Setup plugin + plugin := AzureMonitor{ + EndpointURL: "http://" + ts.Listener.Addr().String(), + Region: "test", + ResourceID: "/test", + Log: testutil.Logger{}, + timeFunc: func() time.Time { return tref }, + } + require.NoError(t, plugin.Connect()) + + // Override with testing setup + plugin.auth = autorest.NullAuthorizer{} + + // Test writing + err := plugin.Write(tt.input) + if tt.expectedError == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tt.expectedError) + } + require.Equal(t, tt.expectedCount, int(count.Load())) }) } }