Bugfix outputs influxdb endless retires (#9296)

This commit is contained in:
Mya Longmire 2021-06-10 15:05:43 -06:00 committed by GitHub
parent f6a9d104f8
commit da7f2c7a93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 149 additions and 29 deletions

View File

@ -22,13 +22,14 @@ import (
) )
const ( const (
defaultRequestTimeout = time.Second * 5 defaultRequestTimeout = time.Second * 5
defaultDatabase = "telegraf" defaultDatabase = "telegraf"
errStringDatabaseNotFound = "database not found" errStringDatabaseNotFound = "database not found"
errStringHintedHandoffNotEmpty = "hinted handoff queue not empty" errStringRetentionPolicyNotFound = "retention policy not found"
errStringPartialWrite = "partial write" errStringHintedHandoffNotEmpty = "hinted handoff queue not empty"
errStringPointsBeyondRP = "points beyond retention policy" errStringPartialWrite = "partial write"
errStringUnableToParse = "unable to parse" errStringPointsBeyondRP = "points beyond retention policy"
errStringUnableToParse = "unable to parse"
) )
var ( var (
@ -356,7 +357,7 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te
body, err := c.validateResponse(resp.Body) body, err := c.validateResponse(resp.Body)
// Check for poorly formatted response (can't be decoded) // Check for poorly formatted response that can't be decoded
if err != nil { if err != nil {
return &APIError{ return &APIError{
StatusCode: resp.StatusCode, StatusCode: resp.StatusCode,
@ -373,7 +374,6 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te
if err == nil { if err == nil {
desc = writeResp.Err desc = writeResp.Err
} }
if strings.Contains(desc, errStringDatabaseNotFound) { if strings.Contains(desc, errStringDatabaseNotFound) {
return &DatabaseNotFoundError{ return &DatabaseNotFoundError{
APIError: APIError{ APIError: APIError{
@ -385,6 +385,18 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te
} }
} }
//checks for any 4xx code and drops metric and retrying will not make the request work
if len(resp.Status) > 0 && resp.Status[0] == '4' {
c.log.Errorf("E! [outputs.influxdb] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
return nil
}
// This error handles if there is an invaild or missing retention policy
if strings.Contains(desc, errStringRetentionPolicyNotFound) {
c.log.Errorf("When writing to [%s]: received error %v", c.URL(), desc)
return nil
}
// This "error" is an informational message about the state of the // This "error" is an informational message about the state of the
// InfluxDB cluster. // InfluxDB cluster.
if strings.Contains(desc, errStringHintedHandoffNotEmpty) { if strings.Contains(desc, errStringHintedHandoffNotEmpty) {

View File

@ -1,3 +1,4 @@
//nolint
package influxdb_test package influxdb_test
import ( import (
@ -13,7 +14,6 @@ import (
"net/url" "net/url"
"os" "os"
"path" "path"
"strings"
"testing" "testing"
"time" "time"
@ -386,7 +386,7 @@ func TestHTTP_Write(t *testing.T) {
}, },
}, },
{ {
name: "hinted handoff not empty no log no error", name: "hinted handoff not empty no error",
config: influxdb.HTTPConfig{ config: influxdb.HTTPConfig{
URL: u, URL: u,
Database: "telegraf", Database: "telegraf",
@ -396,8 +396,8 @@ func TestHTTP_Write(t *testing.T) {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(`{"error": "write failed: hinted handoff queue not empty"}`)) w.Write([]byte(`{"error": "write failed: hinted handoff queue not empty"}`))
}, },
logFunc: func(t *testing.T, str string) { errFunc: func(t *testing.T, err error) {
require.False(t, strings.Contains(str, "hinted handoff queue not empty")) require.NoError(t, err)
}, },
}, },
{ {
@ -1077,19 +1077,6 @@ func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) {
handlers := &MockHandlerChain{ handlers := &MockHandlerChain{
handlers: []http.HandlerFunc{ handlers: []http.HandlerFunc{
func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/query":
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`))
default:
w.WriteHeader(http.StatusInternalServerError)
}
},
func(w http.ResponseWriter, r *http.Request) { func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path { switch r.URL.Path {
case "/write": case "/write":
@ -1147,9 +1134,61 @@ func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) {
err = output.Connect() err = output.Connect()
require.NoError(t, err) require.NoError(t, err)
err = output.Write(metrics) err = output.Write(metrics)
require.Error(t, err) require.NoError(t, err)
err = output.Write(metrics) err = output.Write(metrics)
require.NoError(t, err) require.NoError(t, err)
require.True(t, handlers.Done(), "all handlers not called") require.True(t, handlers.Done(), "all handlers not called")
} }
func TestDBNotFoundShouldDropMetricWhenSkipDatabaseCreateIsTrue(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
f := func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error": "database not found: \"telegraf\""}`))
default:
w.WriteHeader(http.StatusInternalServerError)
}
}
ts.Config.Handler = http.HandlerFunc(f)
metrics := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
}
logger := &testutil.CaptureLogger{}
output := influxdb.InfluxDB{
URL: u.String(),
Database: "telegraf",
DatabaseTag: "database",
SkipDatabaseCreation: true,
Log: logger,
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
}
err = output.Connect()
require.NoError(t, err)
err = output.Write(metrics)
require.Contains(t, logger.LastError, "database not found")
require.NoError(t, err)
err = output.Write(metrics)
require.Contains(t, logger.LastError, "database not found")
require.NoError(t, err)
}

View File

@ -210,6 +210,7 @@ func (i *InfluxDB) SampleConfig() string {
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
ctx := context.Background() ctx := context.Background()
allErrorsAreDatabaseNotFoundErrors := true
var err error var err error
p := rand.Perm(len(i.clients)) p := rand.Perm(len(i.clients))
for _, n := range p { for _, n := range p {
@ -219,20 +220,28 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
i.Log.Errorf("When writing to [%s]: %v", client.URL(), err)
switch apiError := err.(type) { switch apiError := err.(type) {
case *DatabaseNotFoundError: case *DatabaseNotFoundError:
if !i.SkipDatabaseCreation { if !i.SkipDatabaseCreation {
allErrorsAreDatabaseNotFoundErrors = false
err := client.CreateDatabase(ctx, apiError.Database) err := client.CreateDatabase(ctx, apiError.Database)
if err != nil { if err != nil {
i.Log.Errorf("When writing to [%s]: database %q not found and failed to recreate", i.Log.Errorf("When writing to [%s]: database %q not found and failed to recreate",
client.URL(), apiError.Database) client.URL(), apiError.Database)
} else {
// try another client, if all clients fail with this error, do not return error
continue
} }
} }
} }
i.Log.Errorf("When writing to [%s]: %v", client.URL(), err)
} }
if allErrorsAreDatabaseNotFoundErrors {
// return nil because we should not be retrying this
return nil
}
return errors.New("could not write any address") return errors.New("could not write any address")
} }

60
testutil/capturelog.go Normal file
View File

@ -0,0 +1,60 @@
package testutil
import (
"fmt"
"log" //nolint
"github.com/influxdata/telegraf"
)
var _ telegraf.Logger = &CaptureLogger{}
// CaptureLogger defines a logging structure for plugins.
type CaptureLogger struct {
Name string // Name is the plugin name, will be printed in the `[]`.
LastError string
}
// Errorf logs an error message, patterned after log.Printf.
func (l *CaptureLogger) Errorf(format string, args ...interface{}) {
s := fmt.Sprintf("E! ["+l.Name+"] "+format, args...)
l.LastError = s
log.Print(s)
}
// Error logs an error message, patterned after log.Print.
func (l *CaptureLogger) Error(args ...interface{}) {
s := fmt.Sprint(append([]interface{}{"E! [" + l.Name + "] "}, args...)...)
l.LastError = s
log.Print(s)
}
// Debugf logs a debug message, patterned after log.Printf.
func (l *CaptureLogger) Debugf(format string, args ...interface{}) {
log.Printf("D! ["+l.Name+"] "+format, args...)
}
// Debug logs a debug message, patterned after log.Print.
func (l *CaptureLogger) Debug(args ...interface{}) {
log.Print(append([]interface{}{"D! [" + l.Name + "] "}, args...)...)
}
// Warnf logs a warning message, patterned after log.Printf.
func (l *CaptureLogger) Warnf(format string, args ...interface{}) {
log.Printf("W! ["+l.Name+"] "+format, args...)
}
// Warn logs a warning message, patterned after log.Print.
func (l *CaptureLogger) Warn(args ...interface{}) {
log.Print(append([]interface{}{"W! [" + l.Name + "] "}, args...)...)
}
// Infof logs an information message, patterned after log.Printf.
func (l *CaptureLogger) Infof(format string, args ...interface{}) {
log.Printf("I! ["+l.Name+"] "+format, args...)
}
// Info logs an information message, patterned after log.Print.
func (l *CaptureLogger) Info(args ...interface{}) {
log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...)
}