fix(outputs.azure_monitor): Prevent infinite send loop for outdated metrics (#16448)

This commit is contained in:
Sven Rebhan 2025-02-04 16:56:14 +01:00 committed by GitHub
parent ef4cabcfe8
commit 1329ae89ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 186 additions and 66 deletions

View File

@ -8,6 +8,8 @@ them to the service on every flush interval.
> [!IMPORTANT]
> The Azure Monitor custom metrics service is currently in preview and might
> not be available in all Azure regions.
> Please also take the [metric time limitations](#metric-time-limitations) into
> account!
The metrics from each input plugin will be written to a separate Azure Monitor
namespace, prefixed with `Telegraf/` by default. The field name for each metric
@ -15,14 +17,6 @@ is written as the Azure Monitor metric name. All field values are written as a
summarized set that includes: min, max, sum, count. Tags are written as a
dimension on each Azure Monitor metric.
> [!NOTE]
> Azure Monitor won't accept metrics that are too far in the past or future.
> Keep this in mind when configuring your output buffer limits or other
> variables, such as flush intervals, or when using input sources that could
> cause metrics to be out of this allowed range.
> Currently, the timestamp should not be older than 30 minutes or more than
> 4 minutes in the future at the time when it is sent to Azure Monitor service.
⭐ Telegraf v1.8.0
🏷️ cloud, datastore
💻 all
@ -70,6 +64,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## cloud environment, set the appropriate REST endpoint for receiving
## metrics. (Note: region may be unused in this context)
# endpoint_url = "https://monitoring.core.usgovcloudapi.net"
## Time limitations of metric to send
## Documentation can be found here:
## https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp
## However, the returned (400) error message might document more strict or
## relaxed settings. By default, only past metrics witin the limit are sent.
# timestamp_limit_past = "30m"
# timestamp_limit_future = "-1m"
```
## Setup
@ -175,3 +177,30 @@ modifiers][conf-modifiers] to limit the string-typed fields that are sent to
the plugin.
[conf-modifiers]: ../../../docs/CONFIGURATION.md#modifiers
## Metric time limitations
Azure Monitor won't accept metrics too far in the past or future. Keep this in
mind when configuring your output buffer limits or other variables, such as
flush intervals, or when using input sources that could cause metrics to be
out of this allowed range.
According to the [documentation][timestamp_docs], the timestamp should not be
older than 20 minutes or more than 5 minutes in the future at the time when the
metric is sent to the Azure Monitor service. However, HTTP `400` error messages
returned by the service might specify other values such as 30 minutes in the
past and 4 minutes in the future.
You can control the timeframe actually sent using the `timestamp_limit_past` and
`timestamp_limit_future` settings. By default only metrics between 30 minutes
and up to one minute in the past are sent. The lower limit represents the more
permissive limit received in the `400` error messages. The upper limit leaves
enough time for aggregation to happen by not sending aggregations too early.
> [!IMPORTANT]
> When adapting the limit you need to take the limits permitted by the service
> as well as latency when sending metrics into account. Furthermore, you sould
> not send metrics too early as in this case aggregation might not happen and
> values are misleading.
[timestamp_docs]: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp

View File

@ -56,13 +56,15 @@ type aggregate struct {
}
type AzureMonitor struct {
Timeout config.Duration `toml:"timeout"`
NamespacePrefix string `toml:"namespace_prefix"`
StringsAsDimensions bool `toml:"strings_as_dimensions"`
Region string `toml:"region"`
ResourceID string `toml:"resource_id"`
EndpointURL string `toml:"endpoint_url"`
Log telegraf.Logger `toml:"-"`
Timeout config.Duration `toml:"timeout"`
NamespacePrefix string `toml:"namespace_prefix"`
StringsAsDimensions bool `toml:"strings_as_dimensions"`
Region string `toml:"region"`
ResourceID string `toml:"resource_id"`
EndpointURL string `toml:"endpoint_url"`
TimestampLimitPast config.Duration `toml:"timestamp_limit_past"`
TimestampLimitFuture config.Duration `toml:"timestamp_limit_future"`
Log telegraf.Logger `toml:"-"`
url string
preparer autorest.Preparer
@ -91,6 +93,13 @@ func (a *AzureMonitor) Init() error {
}
func (a *AzureMonitor) Connect() error {
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(a.Timeout),
}
// If information is missing try to retrieve it from the Azure VM instance
if a.Region == "" || a.ResourceID == "" {
region, resourceID, err := vmInstanceMetadata(a.client)
@ -119,6 +128,7 @@ func (a *AzureMonitor) Connect() error {
} else {
a.url = a.EndpointURL + a.ResourceID + "/metrics"
}
a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)
a.MetricOutsideWindow = selfstat.Register(
"azure_monitor",
@ -129,15 +139,6 @@ func (a *AzureMonitor) Connect() error {
},
)
a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(a.Timeout),
}
a.Reset()
return nil
@ -155,7 +156,7 @@ func (a *AzureMonitor) Add(m telegraf.Metric) {
// Azure Monitor only supports aggregates 30 minutes into the past and 4
// minutes into the future. Future metrics are dropped when pushed.
tbucket := m.Time().Truncate(time.Minute)
if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) {
if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) {
a.MetricOutsideWindow.Incr(1)
return
}
@ -226,7 +227,7 @@ func (a *AzureMonitor) Push() []telegraf.Metric {
var metrics []telegraf.Metric
for tbucket, aggs := range a.cache {
// Do not send metrics early
if tbucket.After(a.timeFunc().Add(-time.Minute)) {
if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) {
continue
}
for _, agg := range aggs {
@ -261,13 +262,13 @@ func (a *AzureMonitor) Push() []telegraf.Metric {
func (a *AzureMonitor) Reset() {
for tbucket := range a.cache {
// Remove aggregates older than 30 minutes
if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) {
if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) {
delete(a.cache, tbucket)
continue
}
// Metrics updated within the latest 1m have not been pushed and should
// not be cleared.
if tbucket.After(a.timeFunc().Add(-1 * time.Minute)) {
if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) {
continue
}
for id := range a.cache[tbucket] {
@ -278,45 +279,80 @@ func (a *AzureMonitor) Reset() {
// Write writes metrics to the remote endpoint
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
now := a.timeFunc()
tsEarliest := now.Add(-time.Duration(a.TimestampLimitPast))
tsLatest := now.Add(time.Duration(a.TimestampLimitFuture))
writeErr := &internal.PartialWriteError{
MetricsAccept: make([]int, 0, len(metrics)),
}
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
for _, m := range metrics {
for i, m := range metrics {
// Skip metrics that our outside of the valid timespan
if m.Time().Before(tsEarliest) || m.Time().After(tsLatest) {
a.Log.Tracef("Metric outside acceptable time window: %v", m)
a.MetricOutsideWindow.Incr(1)
writeErr.Err = errors.New("metric(s) outside of acceptable time window")
writeErr.MetricsReject = append(writeErr.MetricsReject, i)
continue
}
amm, err := translate(m, a.NamespacePrefix)
if err != nil {
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
if writeErr.Err == nil {
writeErr.Err = errors.New("translating metric(s) failed")
}
writeErr.MetricsReject = append(writeErr.MetricsReject, i)
continue
}
id := hashIDWithTagKeysOnly(m)
if azm, ok := azmetrics[id]; !ok {
azmetrics[id] = amm
azmetrics[id].index = i
} else {
azmetrics[id].Data.BaseData.Series = append(
azm.Data.BaseData.Series,
amm.Data.BaseData.Series...,
)
azmetrics[id].index = i
}
}
if len(azmetrics) == 0 {
return nil
if writeErr.Err == nil {
return nil
}
return writeErr
}
var buffer bytes.Buffer
buffer.Grow(maxRequestBodySize)
batchIndices := make([]int, 0, len(azmetrics))
for _, m := range azmetrics {
// Azure Monitor accepts new batches of points in new-line delimited
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
buf, err := json.Marshal(m)
if err != nil {
a.Log.Errorf("Could not marshall metric to JSON: %v", err)
writeErr.MetricsReject = append(writeErr.MetricsReject, m.index)
writeErr.Err = err
continue
}
batchIndices = append(batchIndices, m.index)
// Azure Monitor's maximum request body size of 4MB. Send batches that
// exceed this size via separate write requests.
if buffer.Len()+len(buf)+1 > maxRequestBodySize {
if err := a.send(buffer.Bytes()); err != nil {
return err
if retryable, err := a.send(buffer.Bytes()); err != nil {
writeErr.Err = err
if !retryable {
writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...)
}
return writeErr
}
writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...)
batchIndices = make([]int, 0, len(azmetrics))
buffer.Reset()
}
if _, err := buffer.Write(buf); err != nil {
@ -327,22 +363,35 @@ func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
}
}
return a.send(buffer.Bytes())
if retryable, err := a.send(buffer.Bytes()); err != nil {
writeErr.Err = err
if !retryable {
writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...)
}
return writeErr
}
writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...)
if writeErr.Err == nil {
return nil
}
return writeErr
}
func (a *AzureMonitor) send(body []byte) error {
func (a *AzureMonitor) send(body []byte) (bool, error) {
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
if _, err := g.Write(body); err != nil {
return fmt.Errorf("zipping content failed: %w", err)
return false, fmt.Errorf("zipping content failed: %w", err)
}
if err := g.Close(); err != nil {
return err
return false, fmt.Errorf("closing gzip writer failed: %w", err)
}
req, err := http.NewRequest("POST", a.url, &buf)
if err != nil {
return fmt.Errorf("creating request failed: %w", err)
return false, fmt.Errorf("creating request failed: %w", err)
}
req.Header.Set("Content-Encoding", "gzip")
@ -352,7 +401,7 @@ func (a *AzureMonitor) send(body []byte) error {
// refresh the token if needed.
req, err = a.preparer.Prepare(req)
if err != nil {
return fmt.Errorf("unable to fetch authentication credentials: %w", err)
return false, fmt.Errorf("unable to fetch authentication credentials: %w", err)
}
resp, err := a.client.Do(req)
@ -366,19 +415,20 @@ func (a *AzureMonitor) send(body []byte) error {
Timeout: time.Duration(a.Timeout),
}
}
return err
return true, err
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
return nil
return false, nil
}
retryable := resp.StatusCode != 400
if respbody, err := io.ReadAll(resp.Body); err == nil {
return fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody))
return retryable, fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody))
}
return fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status)
return retryable, fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status)
}
// vmMetadata retrieves metadata about the current Azure VM
@ -533,12 +583,15 @@ func getIntField(m telegraf.Metric, key string) (int64, error) {
}
return 0, fmt.Errorf("unexpected type: %s: %T", key, fv)
}
func init() {
outputs.Add("azure_monitor", func() telegraf.Output {
return &AzureMonitor{
NamespacePrefix: "Telegraf/",
Timeout: config.Duration(5 * time.Second),
timeFunc: time.Now,
NamespacePrefix: "Telegraf/",
TimestampLimitPast: config.Duration(20 * time.Minute),
TimestampLimitFuture: config.Duration(-1 * time.Minute),
Timeout: config.Duration(5 * time.Second),
timeFunc: time.Now,
}
})
}

View File

@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
@ -176,11 +177,13 @@ func TestAggregate(t *testing.T) {
// Setup plugin
plugin := &AzureMonitor{
Region: "test",
ResourceID: "/test",
StringsAsDimensions: tt.stringdim,
Log: testutil.Logger{},
timeFunc: func() time.Time { return tt.addTime },
Region: "test",
ResourceID: "/test",
StringsAsDimensions: tt.stringdim,
TimestampLimitPast: config.Duration(30 * time.Minute),
TimestampLimitFuture: config.Duration(-1 * time.Minute),
Log: testutil.Logger{},
timeFunc: func() time.Time { return tt.addTime },
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
@ -233,6 +236,7 @@ func TestWrite(t *testing.T) {
time.Unix(0, 0),
),
},
errmsg: "translating metric(s) failed",
},
{
name: "single azure metric",
@ -315,11 +319,13 @@ func TestWrite(t *testing.T) {
// Setup the plugin
plugin := AzureMonitor{
EndpointURL: "http://" + ts.Listener.Addr().String(),
Region: "test",
ResourceID: "/test",
Log: testutil.Logger{},
timeFunc: func() time.Time { return time.Unix(120, 0) },
EndpointURL: "http://" + ts.Listener.Addr().String(),
Region: "test",
ResourceID: "/test",
TimestampLimitPast: config.Duration(30 * time.Minute),
TimestampLimitFuture: config.Duration(-1 * time.Minute),
Log: testutil.Logger{},
timeFunc: func() time.Time { return time.Unix(120, 0) },
}
require.NoError(t, plugin.Init())
@ -328,9 +334,6 @@ func TestWrite(t *testing.T) {
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Override with testing setup
plugin.preparer = autorest.CreatePreparer(autorest.NullAuthorizer{}.WithAuthorization())
err := plugin.Write(tt.metrics)
if tt.errmsg != "" {
require.ErrorContains(t, err, tt.errmsg)
@ -512,6 +515,30 @@ func TestWriteTimelimits(t *testing.T) {
expectedCount: len(inputs),
expectedError: "400 Bad Request: " + msg,
},
{
name: "default limit",
input: inputs,
limitPast: 20 * time.Minute,
limitFuture: -1 * time.Minute,
expectedCount: 2,
expectedError: "metric(s) outside of acceptable time window",
},
{
name: "permissive limit",
input: inputs,
limitPast: 30 * time.Minute,
limitFuture: 5 * time.Minute,
expectedCount: len(inputs) - 2,
expectedError: "metric(s) outside of acceptable time window",
},
{
name: "very strict",
input: inputs,
limitPast: 19*time.Minute + 59*time.Second,
limitFuture: 3*time.Minute + 59*time.Second,
expectedCount: len(inputs) - 6,
expectedError: "metric(s) outside of acceptable time window",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -564,11 +591,13 @@ func TestWriteTimelimits(t *testing.T) {
// Setup plugin
plugin := AzureMonitor{
EndpointURL: "http://" + ts.Listener.Addr().String(),
Region: "test",
ResourceID: "/test",
Log: testutil.Logger{},
timeFunc: func() time.Time { return tref },
EndpointURL: "http://" + ts.Listener.Addr().String(),
Region: "test",
ResourceID: "/test",
TimestampLimitPast: config.Duration(tt.limitPast),
TimestampLimitFuture: config.Duration(tt.limitFuture),
Log: testutil.Logger{},
timeFunc: func() time.Time { return tref },
}
require.NoError(t, plugin.Init())

View File

@ -27,3 +27,11 @@
## cloud environment, set the appropriate REST endpoint for receiving
## metrics. (Note: region may be unused in this context)
# endpoint_url = "https://monitoring.core.usgovcloudapi.net"
## Time limitations of metric to send
## Documentation can be found here:
## https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp
## However, the returned (400) error message might document more strict or
## relaxed settings. By default, only past metrics witin the limit are sent.
# timestamp_limit_past = "30m"
# timestamp_limit_future = "-1m"

View File

@ -6,8 +6,9 @@ import (
)
type azureMonitorMetric struct {
Time time.Time `json:"time"`
Data *azureMonitorData `json:"data"`
Time time.Time `json:"time"`
Data *azureMonitorData `json:"data"`
index int
}
type azureMonitorData struct {