diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 21265ba44..5c11d2821 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -22,13 +22,14 @@ import ( ) const ( - defaultRequestTimeout = time.Second * 5 - defaultDatabase = "telegraf" - errStringDatabaseNotFound = "database not found" - errStringHintedHandoffNotEmpty = "hinted handoff queue not empty" - errStringPartialWrite = "partial write" - errStringPointsBeyondRP = "points beyond retention policy" - errStringUnableToParse = "unable to parse" + defaultRequestTimeout = time.Second * 5 + defaultDatabase = "telegraf" + errStringDatabaseNotFound = "database not found" + errStringRetentionPolicyNotFound = "retention policy not found" + errStringHintedHandoffNotEmpty = "hinted handoff queue not empty" + errStringPartialWrite = "partial write" + errStringPointsBeyondRP = "points beyond retention policy" + errStringUnableToParse = "unable to parse" ) var ( @@ -356,7 +357,7 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te body, err := c.validateResponse(resp.Body) - // Check for poorly formatted response (can't be decoded) + // Check for poorly formatted response that can't be decoded if err != nil { return &APIError{ StatusCode: resp.StatusCode, @@ -373,7 +374,6 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te if err == nil { desc = writeResp.Err } - if strings.Contains(desc, errStringDatabaseNotFound) { return &DatabaseNotFoundError{ APIError: APIError{ @@ -385,6 +385,18 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te } } + //checks for any 4xx code and drops metric and retrying will not make the request work + if len(resp.Status) > 0 && resp.Status[0] == '4' { + c.log.Errorf("E! [outputs.influxdb] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc) + return nil + } + + // This error handles if there is an invaild or missing retention policy + if strings.Contains(desc, errStringRetentionPolicyNotFound) { + c.log.Errorf("When writing to [%s]: received error %v", c.URL(), desc) + return nil + } + // This "error" is an informational message about the state of the // InfluxDB cluster. if strings.Contains(desc, errStringHintedHandoffNotEmpty) { diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index 2f46e2441..39ac2b108 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -1,3 +1,4 @@ +//nolint package influxdb_test import ( @@ -13,7 +14,6 @@ import ( "net/url" "os" "path" - "strings" "testing" "time" @@ -386,7 +386,7 @@ func TestHTTP_Write(t *testing.T) { }, }, { - name: "hinted handoff not empty no log no error", + name: "hinted handoff not empty no error", config: influxdb.HTTPConfig{ URL: u, Database: "telegraf", @@ -396,8 +396,8 @@ func TestHTTP_Write(t *testing.T) { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(`{"error": "write failed: hinted handoff queue not empty"}`)) }, - logFunc: func(t *testing.T, str string) { - require.False(t, strings.Contains(str, "hinted handoff queue not empty")) + errFunc: func(t *testing.T, err error) { + require.NoError(t, err) }, }, { @@ -1077,19 +1077,6 @@ func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) { handlers := &MockHandlerChain{ handlers: []http.HandlerFunc{ - func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/query": - if r.FormValue("q") != `CREATE DATABASE "telegraf"` { - w.WriteHeader(http.StatusInternalServerError) - return - } - w.WriteHeader(http.StatusForbidden) - w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`)) - default: - w.WriteHeader(http.StatusInternalServerError) - } - }, func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/write": @@ -1147,9 +1134,61 @@ func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) { err = output.Connect() require.NoError(t, err) err = output.Write(metrics) - require.Error(t, err) + require.NoError(t, err) err = output.Write(metrics) require.NoError(t, err) require.True(t, handlers.Done(), "all handlers not called") } + +func TestDBNotFoundShouldDropMetricWhenSkipDatabaseCreateIsTrue(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + f := func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/write": + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"error": "database not found: \"telegraf\""}`)) + default: + w.WriteHeader(http.StatusInternalServerError) + } + } + + ts.Config.Handler = http.HandlerFunc(f) + + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + } + + logger := &testutil.CaptureLogger{} + output := influxdb.InfluxDB{ + URL: u.String(), + Database: "telegraf", + DatabaseTag: "database", + SkipDatabaseCreation: true, + Log: logger, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + } + + err = output.Connect() + require.NoError(t, err) + err = output.Write(metrics) + require.Contains(t, logger.LastError, "database not found") + require.NoError(t, err) + + err = output.Write(metrics) + require.Contains(t, logger.LastError, "database not found") + require.NoError(t, err) +} diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 0bb4c01cc..36b38a9c9 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -210,6 +210,7 @@ func (i *InfluxDB) SampleConfig() string { func (i *InfluxDB) Write(metrics []telegraf.Metric) error { ctx := context.Background() + allErrorsAreDatabaseNotFoundErrors := true var err error p := rand.Perm(len(i.clients)) for _, n := range p { @@ -219,20 +220,28 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return nil } + i.Log.Errorf("When writing to [%s]: %v", client.URL(), err) + switch apiError := err.(type) { case *DatabaseNotFoundError: if !i.SkipDatabaseCreation { + allErrorsAreDatabaseNotFoundErrors = false err := client.CreateDatabase(ctx, apiError.Database) if err != nil { i.Log.Errorf("When writing to [%s]: database %q not found and failed to recreate", client.URL(), apiError.Database) + } else { + // try another client, if all clients fail with this error, do not return error + continue } } } - - i.Log.Errorf("When writing to [%s]: %v", client.URL(), err) } + if allErrorsAreDatabaseNotFoundErrors { + // return nil because we should not be retrying this + return nil + } return errors.New("could not write any address") } diff --git a/testutil/capturelog.go b/testutil/capturelog.go new file mode 100644 index 000000000..d26609fff --- /dev/null +++ b/testutil/capturelog.go @@ -0,0 +1,60 @@ +package testutil + +import ( + "fmt" + "log" //nolint + + "github.com/influxdata/telegraf" +) + +var _ telegraf.Logger = &CaptureLogger{} + +// CaptureLogger defines a logging structure for plugins. +type CaptureLogger struct { + Name string // Name is the plugin name, will be printed in the `[]`. + LastError string +} + +// Errorf logs an error message, patterned after log.Printf. +func (l *CaptureLogger) Errorf(format string, args ...interface{}) { + s := fmt.Sprintf("E! ["+l.Name+"] "+format, args...) + l.LastError = s + log.Print(s) +} + +// Error logs an error message, patterned after log.Print. +func (l *CaptureLogger) Error(args ...interface{}) { + s := fmt.Sprint(append([]interface{}{"E! [" + l.Name + "] "}, args...)...) + l.LastError = s + log.Print(s) +} + +// Debugf logs a debug message, patterned after log.Printf. +func (l *CaptureLogger) Debugf(format string, args ...interface{}) { + log.Printf("D! ["+l.Name+"] "+format, args...) +} + +// Debug logs a debug message, patterned after log.Print. +func (l *CaptureLogger) Debug(args ...interface{}) { + log.Print(append([]interface{}{"D! [" + l.Name + "] "}, args...)...) +} + +// Warnf logs a warning message, patterned after log.Printf. +func (l *CaptureLogger) Warnf(format string, args ...interface{}) { + log.Printf("W! ["+l.Name+"] "+format, args...) +} + +// Warn logs a warning message, patterned after log.Print. +func (l *CaptureLogger) Warn(args ...interface{}) { + log.Print(append([]interface{}{"W! [" + l.Name + "] "}, args...)...) +} + +// Infof logs an information message, patterned after log.Printf. +func (l *CaptureLogger) Infof(format string, args ...interface{}) { + log.Printf("I! ["+l.Name+"] "+format, args...) +} + +// Info logs an information message, patterned after log.Print. +func (l *CaptureLogger) Info(args ...interface{}) { + log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...) +}