feat(outputs.health): Add max time between metrics check (#16646)

This commit is contained in:
Lucas Chiesa 2025-04-14 18:03:22 +02:00 committed by GitHub
parent 58d8496671
commit be9e5bfbb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 166 additions and 11 deletions

View File

@ -46,6 +46,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# tls_cert = "/etc/telegraf/cert.pem" # tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Maximum expected time between metrics being written
## Enforces an unhealthy state if there was no new metric seen for at least
## the specified time. The check is disabled by default and only used if a
## positive time is specified.
# max_time_between_metrics = "0s"
## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## NOTE: Due to the way TOML is parsed, tables must be at the END of the
## plugin definition, otherwise additional config options are read as part of ## plugin definition, otherwise additional config options are read as part of
## the table ## the table
@ -67,6 +73,19 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## field = "buffer_size" ## field = "buffer_size"
``` ```
### Maximum time between metrics
The health plugin can assert that metrics are being delivered to it at an
expected rate when setting `max_time_between_metrics` to a positive number.
The check measures the time between consecutive writes to the plugin and
compares it to the defined `max_time_between_metrics`. When the time
elapsed between writes is greater than the configured maximum time, the plugin
will report an unhealthy status. As soon as metrics are written again to the
plugin, the health status will reset to healthy.
Note that the metric timestamps are not taken into account, rather the time they
are written to the plugin.
### compares ### compares
The `compares` check is used to assert basic mathematical relationships. Use The `compares` check is used to assert basic mathematical relationships. Use

View File

@ -41,17 +41,20 @@ type Health struct {
BasicPassword string `toml:"basic_password"` BasicPassword string `toml:"basic_password"`
common_tls.ServerConfig common_tls.ServerConfig
Compares []*Compares `toml:"compares"` Compares []*Compares `toml:"compares"`
Contains []*Contains `toml:"contains"` Contains []*Contains `toml:"contains"`
MaxTimeBetweenMetrics config.Duration `toml:"max_time_between_metrics"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
checkers []Checker checkers []Checker
wg sync.WaitGroup wg sync.WaitGroup
server *http.Server server *http.Server
origin string origin string
network string network string
address string address string
tlsConf *tls.Config tlsConf *tls.Config
lastMetricTime time.Time
mu sync.Mutex mu sync.Mutex
healthy bool healthy bool
@ -117,7 +120,9 @@ func (h *Health) Connect() error {
h.origin = h.getOrigin(listener) h.origin = h.getOrigin(listener)
h.Log.Infof("Listening on %s", h.origin) h.Log.Infof("Listening on %s", h.origin)
// Initialize lastMetricTime here to fail if no metrics are received
// before the configured max timeout.
h.lastMetricTime = time.Now()
h.wg.Add(1) h.wg.Add(1)
go func() { go func() {
defer h.wg.Done() defer h.wg.Done()
@ -143,7 +148,13 @@ func (h *Health) listen() (net.Listener, error) {
func (h *Health) ServeHTTP(rw http.ResponseWriter, _ *http.Request) { func (h *Health) ServeHTTP(rw http.ResponseWriter, _ *http.Request) {
var code = http.StatusOK var code = http.StatusOK
if !h.isHealthy() {
healthy := h.isHealthy()
if h.MaxTimeBetweenMetrics > 0 {
healthy = healthy && time.Since(h.lastMetricTime) < time.Duration(h.MaxTimeBetweenMetrics)
}
if !healthy {
code = http.StatusServiceUnavailable code = http.StatusServiceUnavailable
} }
@ -153,6 +164,7 @@ func (h *Health) ServeHTTP(rw http.ResponseWriter, _ *http.Request) {
// Write runs all checks over the metric batch and adjust health state. // Write runs all checks over the metric batch and adjust health state.
func (h *Health) Write(metrics []telegraf.Metric) error { func (h *Health) Write(metrics []telegraf.Metric) error {
h.lastMetricTime = time.Now()
healthy := true healthy := true
for _, checker := range h.checkers { for _, checker := range h.checkers {
success := checker.Check(metrics) success := checker.Check(metrics)
@ -160,7 +172,9 @@ func (h *Health) Write(metrics []telegraf.Metric) error {
healthy = false healthy = false
} }
} }
// healthy only represents the result of the configured checkers and not
// the MaxTimeBetweenMetrics validation. The timeout check is done when
// serving the HTTP response.
h.setHealthy(healthy) h.setHealthy(healthy)
return nil return nil
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs/health" "github.com/influxdata/telegraf/plugins/outputs/health"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -214,3 +215,118 @@ func TestInitServiceAddress(t *testing.T) {
}) })
} }
} }
func TestTimeBetweenMetrics(t *testing.T) {
arbitraryTime := time.Time{}.AddDate(2002, 0, 0)
tests := []struct {
name string
maxTimeBetweenMetrics config.Duration
metrics []telegraf.Metric
delay time.Duration
expectedCode int
}{
{
name: "healthy enabled no metrics before timeout",
maxTimeBetweenMetrics: config.Duration(1 * time.Second),
metrics: nil,
delay: 0 * time.Second,
expectedCode: 200,
},
{
name: "unhealthy enabled no metrics after timeout",
maxTimeBetweenMetrics: config.Duration(5 * time.Millisecond),
metrics: nil,
delay: 5 * time.Millisecond,
expectedCode: 503,
},
{
name: "healthy when disabled and old metric",
maxTimeBetweenMetrics: config.Duration(0),
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]any{
"time_idle": 42,
},
arbitraryTime),
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]any{
"time_idle": 64,
},
arbitraryTime),
},
delay: 10 * time.Millisecond,
expectedCode: 200,
},
{
name: "healthy when enabled and recent metric",
maxTimeBetweenMetrics: config.Duration(5 * time.Second),
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]any{
"time_idle": 42,
},
arbitraryTime),
},
delay: 0 * time.Second,
expectedCode: 200,
},
{
name: "unhealthy when enabled and old metric",
maxTimeBetweenMetrics: config.Duration(5 * time.Millisecond),
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]any{
"time_idle": 42,
},
arbitraryTime),
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]any{
"time_idle": 64,
},
arbitraryTime),
},
delay: 10 * time.Millisecond,
expectedCode: 503,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dut := health.NewHealth()
dut.ServiceAddress = "tcp://127.0.0.1:0"
dut.Log = testutil.Logger{}
dut.MaxTimeBetweenMetrics = tt.maxTimeBetweenMetrics
err := dut.Init()
require.NoError(t, err)
err = dut.Connect()
require.NoError(t, err)
err = dut.Write(tt.metrics)
require.NoError(t, err)
time.Sleep(tt.delay)
resp, err := http.Get(dut.Origin())
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, tt.expectedCode, resp.StatusCode)
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
err = dut.Close()
require.NoError(t, err)
})
}
}

View File

@ -21,6 +21,12 @@
# tls_cert = "/etc/telegraf/cert.pem" # tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem" # tls_key = "/etc/telegraf/key.pem"
## Maximum expected time between metrics being written
## Enforces an unhealthy state if there was no new metric seen for at least
## the specified time. The check is disabled by default and only used if a
## positive time is specified.
# max_time_between_metrics = "0s"
## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## NOTE: Due to the way TOML is parsed, tables must be at the END of the
## plugin definition, otherwise additional config options are read as part of ## plugin definition, otherwise additional config options are read as part of
## the table ## the table