diff --git a/plugins/outputs/health/README.md b/plugins/outputs/health/README.md index ad0a5db91..284eec266 100644 --- a/plugins/outputs/health/README.md +++ b/plugins/outputs/health/README.md @@ -46,6 +46,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" + ## Maximum expected time between metrics being written + ## Enforces an unhealthy state if there was no new metric seen for at least + ## the specified time. The check is disabled by default and only used if a + ## positive time is specified. + # max_time_between_metrics = "0s" + ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of ## the table @@ -67,6 +73,19 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## field = "buffer_size" ``` +### Maximum time between metrics + +The health plugin can assert that metrics are being delivered to it at an +expected rate when setting `max_time_between_metrics` to a positive number. +The check measures the time between consecutive writes to the plugin and +compares it to the defined `max_time_between_metrics`. When the time +elapsed between writes is greater than the configured maximum time, the plugin +will report an unhealthy status. As soon as metrics are written again to the +plugin, the health status will reset to healthy. + +Note that the metric timestamps are not taken into account, rather the time they +are written to the plugin. + ### compares The `compares` check is used to assert basic mathematical relationships. Use diff --git a/plugins/outputs/health/health.go b/plugins/outputs/health/health.go index e99da4044..216535bca 100644 --- a/plugins/outputs/health/health.go +++ b/plugins/outputs/health/health.go @@ -41,17 +41,20 @@ type Health struct { BasicPassword string `toml:"basic_password"` common_tls.ServerConfig - Compares []*Compares `toml:"compares"` - Contains []*Contains `toml:"contains"` + Compares []*Compares `toml:"compares"` + Contains []*Contains `toml:"contains"` + MaxTimeBetweenMetrics config.Duration `toml:"max_time_between_metrics"` + Log telegraf.Logger `toml:"-"` checkers []Checker - wg sync.WaitGroup - server *http.Server - origin string - network string - address string - tlsConf *tls.Config + wg sync.WaitGroup + server *http.Server + origin string + network string + address string + tlsConf *tls.Config + lastMetricTime time.Time mu sync.Mutex healthy bool @@ -117,7 +120,9 @@ func (h *Health) Connect() error { h.origin = h.getOrigin(listener) h.Log.Infof("Listening on %s", h.origin) - + // Initialize lastMetricTime here to fail if no metrics are received + // before the configured max timeout. + h.lastMetricTime = time.Now() h.wg.Add(1) go func() { defer h.wg.Done() @@ -143,7 +148,13 @@ func (h *Health) listen() (net.Listener, error) { func (h *Health) ServeHTTP(rw http.ResponseWriter, _ *http.Request) { var code = http.StatusOK - if !h.isHealthy() { + + healthy := h.isHealthy() + if h.MaxTimeBetweenMetrics > 0 { + healthy = healthy && time.Since(h.lastMetricTime) < time.Duration(h.MaxTimeBetweenMetrics) + } + + if !healthy { code = http.StatusServiceUnavailable } @@ -153,6 +164,7 @@ func (h *Health) ServeHTTP(rw http.ResponseWriter, _ *http.Request) { // Write runs all checks over the metric batch and adjust health state. func (h *Health) Write(metrics []telegraf.Metric) error { + h.lastMetricTime = time.Now() healthy := true for _, checker := range h.checkers { success := checker.Check(metrics) @@ -160,7 +172,9 @@ func (h *Health) Write(metrics []telegraf.Metric) error { healthy = false } } - + // healthy only represents the result of the configured checkers and not + // the MaxTimeBetweenMetrics validation. The timeout check is done when + // serving the HTTP response. h.setHealthy(healthy) return nil } diff --git a/plugins/outputs/health/health_test.go b/plugins/outputs/health/health_test.go index e155a6cba..e070e5f53 100644 --- a/plugins/outputs/health/health_test.go +++ b/plugins/outputs/health/health_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs/health" "github.com/influxdata/telegraf/testutil" ) @@ -214,3 +215,118 @@ func TestInitServiceAddress(t *testing.T) { }) } } + +func TestTimeBetweenMetrics(t *testing.T) { + arbitraryTime := time.Time{}.AddDate(2002, 0, 0) + tests := []struct { + name string + maxTimeBetweenMetrics config.Duration + metrics []telegraf.Metric + delay time.Duration + expectedCode int + }{ + { + name: "healthy enabled no metrics before timeout", + maxTimeBetweenMetrics: config.Duration(1 * time.Second), + metrics: nil, + delay: 0 * time.Second, + expectedCode: 200, + }, + { + name: "unhealthy enabled no metrics after timeout", + maxTimeBetweenMetrics: config.Duration(5 * time.Millisecond), + metrics: nil, + delay: 5 * time.Millisecond, + expectedCode: 503, + }, + { + name: "healthy when disabled and old metric", + maxTimeBetweenMetrics: config.Duration(0), + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]any{ + "time_idle": 42, + }, + arbitraryTime), + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]any{ + "time_idle": 64, + }, + arbitraryTime), + }, + delay: 10 * time.Millisecond, + expectedCode: 200, + }, + { + name: "healthy when enabled and recent metric", + maxTimeBetweenMetrics: config.Duration(5 * time.Second), + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]any{ + "time_idle": 42, + }, + arbitraryTime), + }, + delay: 0 * time.Second, + expectedCode: 200, + }, + { + name: "unhealthy when enabled and old metric", + maxTimeBetweenMetrics: config.Duration(5 * time.Millisecond), + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]any{ + "time_idle": 42, + }, + arbitraryTime), + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]any{ + "time_idle": 64, + }, + arbitraryTime), + }, + delay: 10 * time.Millisecond, + expectedCode: 503, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dut := health.NewHealth() + dut.ServiceAddress = "tcp://127.0.0.1:0" + dut.Log = testutil.Logger{} + dut.MaxTimeBetweenMetrics = tt.maxTimeBetweenMetrics + + err := dut.Init() + require.NoError(t, err) + + err = dut.Connect() + require.NoError(t, err) + + err = dut.Write(tt.metrics) + require.NoError(t, err) + + time.Sleep(tt.delay) + resp, err := http.Get(dut.Origin()) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, tt.expectedCode, resp.StatusCode) + + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + err = dut.Close() + require.NoError(t, err) + }) + } +} diff --git a/plugins/outputs/health/sample.conf b/plugins/outputs/health/sample.conf index 07d5a4b3d..3e7207242 100644 --- a/plugins/outputs/health/sample.conf +++ b/plugins/outputs/health/sample.conf @@ -21,6 +21,12 @@ # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" + ## Maximum expected time between metrics being written + ## Enforces an unhealthy state if there was no new metric seen for at least + ## the specified time. The check is disabled by default and only used if a + ## positive time is specified. + # max_time_between_metrics = "0s" + ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of ## the table