fix(outputs.opensearch): Correctly error during failures or disconnect (#15157)

This commit is contained in:
Joshua Powers 2024-04-19 07:23:05 -06:00 committed by GitHub
parent 2acae45d09
commit 6e3577f40e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 70 additions and 3 deletions

View File

@ -142,6 +142,11 @@ func (o *Opensearch) Connect() error {
}
}
_, err = o.osClient.Ping()
if err != nil {
return fmt.Errorf("unable to ping OpenSearch server: %w", err)
}
return nil
}
@ -296,9 +301,10 @@ func (o *Opensearch) Write(metrics []telegraf.Metric) error {
// Report the indexer statistics
stats := bulkIndxr.Stats()
if stats.NumAdded < uint64(len(metrics)) {
return fmt.Errorf("indexed [%d] documents with [%d] errors", stats.NumAdded, stats.NumFailed)
if stats.NumFailed > 0 {
return fmt.Errorf("failed to index [%d] documents", stats.NumFailed)
}
o.Log.Debugf("Successfully indexed [%d] documents", stats.NumAdded)
}
@ -364,7 +370,6 @@ func (o *Opensearch) GetIndexName(metric telegraf.Metric) (string, error) {
if strings.Contains(indexName, "{{") {
return "", fmt.Errorf("failed to evaluate valid indexname: %s", indexName)
}
o.Log.Debugf("indexName- %s", indexName)
return indexName, nil
}

View File

@ -253,3 +253,65 @@ func TestAuthorizationHeaderWhenBearerTokenIsPresent(t *testing.T) {
err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestDisconnectedServerOnConnect(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}))
urls := []string{"http://" + ts.Listener.Addr().String()}
e := &Opensearch{
URLs: urls,
IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: false,
Log: testutil.Logger{},
AuthBearerToken: config.NewSecret([]byte("0123456789abcdef")),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
// Close the server right before we try to connect.
ts.Close()
require.Error(t, e.Connect())
}
func TestDisconnectedServerOnWrite(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/_bulk":
require.Equal(t, "Bearer 0123456789abcdef", r.Header.Get("Authorization"))
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
return
default:
_, err := w.Write([]byte(`{"version": {"number": "7.8"}}`))
require.NoError(t, err)
return
}
}))
defer ts.Close()
urls := []string{"http://" + ts.Listener.Addr().String()}
e := &Opensearch{
URLs: urls,
IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: false,
Log: testutil.Logger{},
AuthBearerToken: config.NewSecret([]byte("0123456789abcdef")),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
require.NoError(t, e.Connect())
err := e.Write(testutil.MockMetrics())
require.NoError(t, err)
// Close the server right before we try to write a second time.
ts.Close()
err = e.Write(testutil.MockMetrics())
require.Error(t, err)
}