diff --git a/plugins/outputs/azure_monitor/README.md b/plugins/outputs/azure_monitor/README.md index de16fb83d..d6955f1a3 100644 --- a/plugins/outputs/azure_monitor/README.md +++ b/plugins/outputs/azure_monitor/README.md @@ -8,6 +8,8 @@ them to the service on every flush interval. > [!IMPORTANT] > The Azure Monitor custom metrics service is currently in preview and might > not be available in all Azure regions. +> Please also take the [metric time limitations](#metric-time-limitations) into +> account! The metrics from each input plugin will be written to a separate Azure Monitor namespace, prefixed with `Telegraf/` by default. The field name for each metric @@ -15,14 +17,6 @@ is written as the Azure Monitor metric name. All field values are written as a summarized set that includes: min, max, sum, count. Tags are written as a dimension on each Azure Monitor metric. -> [!NOTE] -> Azure Monitor won't accept metrics that are too far in the past or future. -> Keep this in mind when configuring your output buffer limits or other -> variables, such as flush intervals, or when using input sources that could -> cause metrics to be out of this allowed range. -> Currently, the timestamp should not be older than 30 minutes or more than -> 4 minutes in the future at the time when it is sent to Azure Monitor service. - ⭐ Telegraf v1.8.0 🏷️ cloud, datastore 💻 all @@ -70,6 +64,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## cloud environment, set the appropriate REST endpoint for receiving ## metrics. (Note: region may be unused in this context) # endpoint_url = "https://monitoring.core.usgovcloudapi.net" + + ## Time limitations of metric to send + ## Documentation can be found here: + ## https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp + ## However, the returned (400) error message might document more strict or + ## relaxed settings. By default, only past metrics witin the limit are sent. + # timestamp_limit_past = "30m" + # timestamp_limit_future = "-1m" ``` ## Setup @@ -175,3 +177,30 @@ modifiers][conf-modifiers] to limit the string-typed fields that are sent to the plugin. [conf-modifiers]: ../../../docs/CONFIGURATION.md#modifiers + +## Metric time limitations + +Azure Monitor won't accept metrics too far in the past or future. Keep this in +mind when configuring your output buffer limits or other variables, such as +flush intervals, or when using input sources that could cause metrics to be +out of this allowed range. + +According to the [documentation][timestamp_docs], the timestamp should not be +older than 20 minutes or more than 5 minutes in the future at the time when the +metric is sent to the Azure Monitor service. However, HTTP `400` error messages +returned by the service might specify other values such as 30 minutes in the +past and 4 minutes in the future. + +You can control the timeframe actually sent using the `timestamp_limit_past` and +`timestamp_limit_future` settings. By default only metrics between 30 minutes +and up to one minute in the past are sent. The lower limit represents the more +permissive limit received in the `400` error messages. The upper limit leaves +enough time for aggregation to happen by not sending aggregations too early. + +> [!IMPORTANT] +> When adapting the limit you need to take the limits permitted by the service +> as well as latency when sending metrics into account. Furthermore, you sould +> not send metrics too early as in this case aggregation might not happen and +> values are misleading. + +[timestamp_docs]: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp diff --git a/plugins/outputs/azure_monitor/azure_monitor.go b/plugins/outputs/azure_monitor/azure_monitor.go index 880b528fd..25aee617e 100644 --- a/plugins/outputs/azure_monitor/azure_monitor.go +++ b/plugins/outputs/azure_monitor/azure_monitor.go @@ -56,13 +56,15 @@ type aggregate struct { } type AzureMonitor struct { - Timeout config.Duration `toml:"timeout"` - NamespacePrefix string `toml:"namespace_prefix"` - StringsAsDimensions bool `toml:"strings_as_dimensions"` - Region string `toml:"region"` - ResourceID string `toml:"resource_id"` - EndpointURL string `toml:"endpoint_url"` - Log telegraf.Logger `toml:"-"` + Timeout config.Duration `toml:"timeout"` + NamespacePrefix string `toml:"namespace_prefix"` + StringsAsDimensions bool `toml:"strings_as_dimensions"` + Region string `toml:"region"` + ResourceID string `toml:"resource_id"` + EndpointURL string `toml:"endpoint_url"` + TimestampLimitPast config.Duration `toml:"timestamp_limit_past"` + TimestampLimitFuture config.Duration `toml:"timestamp_limit_future"` + Log telegraf.Logger `toml:"-"` url string preparer autorest.Preparer @@ -91,6 +93,13 @@ func (a *AzureMonitor) Init() error { } func (a *AzureMonitor) Connect() error { + a.client = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }, + Timeout: time.Duration(a.Timeout), + } + // If information is missing try to retrieve it from the Azure VM instance if a.Region == "" || a.ResourceID == "" { region, resourceID, err := vmInstanceMetadata(a.client) @@ -119,6 +128,7 @@ func (a *AzureMonitor) Connect() error { } else { a.url = a.EndpointURL + a.ResourceID + "/metrics" } + a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url) a.MetricOutsideWindow = selfstat.Register( "azure_monitor", @@ -129,15 +139,6 @@ func (a *AzureMonitor) Connect() error { }, ) - a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url) - - a.client = &http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - }, - Timeout: time.Duration(a.Timeout), - } - a.Reset() return nil @@ -155,7 +156,7 @@ func (a *AzureMonitor) Add(m telegraf.Metric) { // Azure Monitor only supports aggregates 30 minutes into the past and 4 // minutes into the future. Future metrics are dropped when pushed. tbucket := m.Time().Truncate(time.Minute) - if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) { + if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) { a.MetricOutsideWindow.Incr(1) return } @@ -226,7 +227,7 @@ func (a *AzureMonitor) Push() []telegraf.Metric { var metrics []telegraf.Metric for tbucket, aggs := range a.cache { // Do not send metrics early - if tbucket.After(a.timeFunc().Add(-time.Minute)) { + if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) { continue } for _, agg := range aggs { @@ -261,13 +262,13 @@ func (a *AzureMonitor) Push() []telegraf.Metric { func (a *AzureMonitor) Reset() { for tbucket := range a.cache { // Remove aggregates older than 30 minutes - if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) { + if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) { delete(a.cache, tbucket) continue } // Metrics updated within the latest 1m have not been pushed and should // not be cleared. - if tbucket.After(a.timeFunc().Add(-1 * time.Minute)) { + if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) { continue } for id := range a.cache[tbucket] { @@ -278,45 +279,80 @@ func (a *AzureMonitor) Reset() { // Write writes metrics to the remote endpoint func (a *AzureMonitor) Write(metrics []telegraf.Metric) error { + now := a.timeFunc() + tsEarliest := now.Add(-time.Duration(a.TimestampLimitPast)) + tsLatest := now.Add(time.Duration(a.TimestampLimitFuture)) + + writeErr := &internal.PartialWriteError{ + MetricsAccept: make([]int, 0, len(metrics)), + } azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics)) - for _, m := range metrics { + for i, m := range metrics { + // Skip metrics that our outside of the valid timespan + if m.Time().Before(tsEarliest) || m.Time().After(tsLatest) { + a.Log.Tracef("Metric outside acceptable time window: %v", m) + a.MetricOutsideWindow.Incr(1) + writeErr.Err = errors.New("metric(s) outside of acceptable time window") + writeErr.MetricsReject = append(writeErr.MetricsReject, i) + continue + } + amm, err := translate(m, a.NamespacePrefix) if err != nil { a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name()) + if writeErr.Err == nil { + writeErr.Err = errors.New("translating metric(s) failed") + } + writeErr.MetricsReject = append(writeErr.MetricsReject, i) continue } id := hashIDWithTagKeysOnly(m) if azm, ok := azmetrics[id]; !ok { azmetrics[id] = amm + azmetrics[id].index = i } else { azmetrics[id].Data.BaseData.Series = append( azm.Data.BaseData.Series, amm.Data.BaseData.Series..., ) + azmetrics[id].index = i } } if len(azmetrics) == 0 { - return nil + if writeErr.Err == nil { + return nil + } + return writeErr } var buffer bytes.Buffer buffer.Grow(maxRequestBodySize) + batchIndices := make([]int, 0, len(azmetrics)) for _, m := range azmetrics { // Azure Monitor accepts new batches of points in new-line delimited // JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec). buf, err := json.Marshal(m) if err != nil { - a.Log.Errorf("Could not marshall metric to JSON: %v", err) + writeErr.MetricsReject = append(writeErr.MetricsReject, m.index) + writeErr.Err = err continue } + batchIndices = append(batchIndices, m.index) + // Azure Monitor's maximum request body size of 4MB. Send batches that // exceed this size via separate write requests. if buffer.Len()+len(buf)+1 > maxRequestBodySize { - if err := a.send(buffer.Bytes()); err != nil { - return err + if retryable, err := a.send(buffer.Bytes()); err != nil { + writeErr.Err = err + if !retryable { + writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...) + } + return writeErr } + writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...) + batchIndices = make([]int, 0, len(azmetrics)) buffer.Reset() } if _, err := buffer.Write(buf); err != nil { @@ -327,22 +363,35 @@ func (a *AzureMonitor) Write(metrics []telegraf.Metric) error { } } - return a.send(buffer.Bytes()) + if retryable, err := a.send(buffer.Bytes()); err != nil { + writeErr.Err = err + if !retryable { + writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...) + } + return writeErr + } + writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...) + + if writeErr.Err == nil { + return nil + } + + return writeErr } -func (a *AzureMonitor) send(body []byte) error { +func (a *AzureMonitor) send(body []byte) (bool, error) { var buf bytes.Buffer g := gzip.NewWriter(&buf) if _, err := g.Write(body); err != nil { - return fmt.Errorf("zipping content failed: %w", err) + return false, fmt.Errorf("zipping content failed: %w", err) } if err := g.Close(); err != nil { - return err + return false, fmt.Errorf("closing gzip writer failed: %w", err) } req, err := http.NewRequest("POST", a.url, &buf) if err != nil { - return fmt.Errorf("creating request failed: %w", err) + return false, fmt.Errorf("creating request failed: %w", err) } req.Header.Set("Content-Encoding", "gzip") @@ -352,7 +401,7 @@ func (a *AzureMonitor) send(body []byte) error { // refresh the token if needed. req, err = a.preparer.Prepare(req) if err != nil { - return fmt.Errorf("unable to fetch authentication credentials: %w", err) + return false, fmt.Errorf("unable to fetch authentication credentials: %w", err) } resp, err := a.client.Do(req) @@ -366,19 +415,20 @@ func (a *AzureMonitor) send(body []byte) error { Timeout: time.Duration(a.Timeout), } } - return err + return true, err } defer resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode <= 299 { - return nil + return false, nil } + retryable := resp.StatusCode != 400 if respbody, err := io.ReadAll(resp.Body); err == nil { - return fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody)) + return retryable, fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody)) } - return fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status) + return retryable, fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status) } // vmMetadata retrieves metadata about the current Azure VM @@ -533,12 +583,15 @@ func getIntField(m telegraf.Metric, key string) (int64, error) { } return 0, fmt.Errorf("unexpected type: %s: %T", key, fv) } + func init() { outputs.Add("azure_monitor", func() telegraf.Output { return &AzureMonitor{ - NamespacePrefix: "Telegraf/", - Timeout: config.Duration(5 * time.Second), - timeFunc: time.Now, + NamespacePrefix: "Telegraf/", + TimestampLimitPast: config.Duration(20 * time.Minute), + TimestampLimitFuture: config.Duration(-1 * time.Minute), + Timeout: config.Duration(5 * time.Second), + timeFunc: time.Now, } }) } diff --git a/plugins/outputs/azure_monitor/azure_monitor_test.go b/plugins/outputs/azure_monitor/azure_monitor_test.go index ce2d97b8f..1f7a9c1c5 100644 --- a/plugins/outputs/azure_monitor/azure_monitor_test.go +++ b/plugins/outputs/azure_monitor/azure_monitor_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -176,11 +177,13 @@ func TestAggregate(t *testing.T) { // Setup plugin plugin := &AzureMonitor{ - Region: "test", - ResourceID: "/test", - StringsAsDimensions: tt.stringdim, - Log: testutil.Logger{}, - timeFunc: func() time.Time { return tt.addTime }, + Region: "test", + ResourceID: "/test", + StringsAsDimensions: tt.stringdim, + TimestampLimitPast: config.Duration(30 * time.Minute), + TimestampLimitFuture: config.Duration(-1 * time.Minute), + Log: testutil.Logger{}, + timeFunc: func() time.Time { return tt.addTime }, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -233,6 +236,7 @@ func TestWrite(t *testing.T) { time.Unix(0, 0), ), }, + errmsg: "translating metric(s) failed", }, { name: "single azure metric", @@ -315,11 +319,13 @@ func TestWrite(t *testing.T) { // Setup the plugin plugin := AzureMonitor{ - EndpointURL: "http://" + ts.Listener.Addr().String(), - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - timeFunc: func() time.Time { return time.Unix(120, 0) }, + EndpointURL: "http://" + ts.Listener.Addr().String(), + Region: "test", + ResourceID: "/test", + TimestampLimitPast: config.Duration(30 * time.Minute), + TimestampLimitFuture: config.Duration(-1 * time.Minute), + Log: testutil.Logger{}, + timeFunc: func() time.Time { return time.Unix(120, 0) }, } require.NoError(t, plugin.Init()) @@ -328,9 +334,6 @@ func TestWrite(t *testing.T) { require.NoError(t, plugin.Connect()) defer plugin.Close() - // Override with testing setup - plugin.preparer = autorest.CreatePreparer(autorest.NullAuthorizer{}.WithAuthorization()) - err := plugin.Write(tt.metrics) if tt.errmsg != "" { require.ErrorContains(t, err, tt.errmsg) @@ -512,6 +515,30 @@ func TestWriteTimelimits(t *testing.T) { expectedCount: len(inputs), expectedError: "400 Bad Request: " + msg, }, + { + name: "default limit", + input: inputs, + limitPast: 20 * time.Minute, + limitFuture: -1 * time.Minute, + expectedCount: 2, + expectedError: "metric(s) outside of acceptable time window", + }, + { + name: "permissive limit", + input: inputs, + limitPast: 30 * time.Minute, + limitFuture: 5 * time.Minute, + expectedCount: len(inputs) - 2, + expectedError: "metric(s) outside of acceptable time window", + }, + { + name: "very strict", + input: inputs, + limitPast: 19*time.Minute + 59*time.Second, + limitFuture: 3*time.Minute + 59*time.Second, + expectedCount: len(inputs) - 6, + expectedError: "metric(s) outside of acceptable time window", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -564,11 +591,13 @@ func TestWriteTimelimits(t *testing.T) { // Setup plugin plugin := AzureMonitor{ - EndpointURL: "http://" + ts.Listener.Addr().String(), - Region: "test", - ResourceID: "/test", - Log: testutil.Logger{}, - timeFunc: func() time.Time { return tref }, + EndpointURL: "http://" + ts.Listener.Addr().String(), + Region: "test", + ResourceID: "/test", + TimestampLimitPast: config.Duration(tt.limitPast), + TimestampLimitFuture: config.Duration(tt.limitFuture), + Log: testutil.Logger{}, + timeFunc: func() time.Time { return tref }, } require.NoError(t, plugin.Init()) diff --git a/plugins/outputs/azure_monitor/sample.conf b/plugins/outputs/azure_monitor/sample.conf index cee8539fe..ca247c787 100644 --- a/plugins/outputs/azure_monitor/sample.conf +++ b/plugins/outputs/azure_monitor/sample.conf @@ -27,3 +27,11 @@ ## cloud environment, set the appropriate REST endpoint for receiving ## metrics. (Note: region may be unused in this context) # endpoint_url = "https://monitoring.core.usgovcloudapi.net" + + ## Time limitations of metric to send + ## Documentation can be found here: + ## https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp + ## However, the returned (400) error message might document more strict or + ## relaxed settings. By default, only past metrics witin the limit are sent. + # timestamp_limit_past = "30m" + # timestamp_limit_future = "-1m" \ No newline at end of file diff --git a/plugins/outputs/azure_monitor/types.go b/plugins/outputs/azure_monitor/types.go index 212923d8f..6eac54ec2 100644 --- a/plugins/outputs/azure_monitor/types.go +++ b/plugins/outputs/azure_monitor/types.go @@ -6,8 +6,9 @@ import ( ) type azureMonitorMetric struct { - Time time.Time `json:"time"` - Data *azureMonitorData `json:"data"` + Time time.Time `json:"time"` + Data *azureMonitorData `json:"data"` + index int } type azureMonitorData struct {