Fix metrics reported as written but not actually written (#9526)
This commit is contained in:
parent
fdec5f1f31
commit
8d2b1e8dc1
|
|
@ -1077,6 +1077,19 @@ func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) {
|
||||||
|
|
||||||
handlers := &MockHandlerChain{
|
handlers := &MockHandlerChain{
|
||||||
handlers: []http.HandlerFunc{
|
handlers: []http.HandlerFunc{
|
||||||
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/query":
|
||||||
|
if r.FormValue("q") != `CREATE DATABASE "telegraf"` {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
w.Write([]byte(`{"results": [{"error": "error authorizing query"}]}`))
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
},
|
||||||
func(w http.ResponseWriter, r *http.Request) {
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.URL.Path {
|
switch r.URL.Path {
|
||||||
case "/write":
|
case "/write":
|
||||||
|
|
@ -1133,8 +1146,12 @@ func TestDBRPTagsCreateDatabaseCalledOnDatabaseNotFound(t *testing.T) {
|
||||||
|
|
||||||
err = output.Connect()
|
err = output.Connect()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// this write fails, but we're expecting it to drop the metrics and not retry, so no error.
|
||||||
err = output.Write(metrics)
|
err = output.Write(metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// expects write to succeed
|
||||||
err = output.Write(metrics)
|
err = output.Write(metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
//nolint
|
||||||
package influxdb
|
package influxdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
@ -224,17 +225,20 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
switch apiError := err.(type) {
|
switch apiError := err.(type) {
|
||||||
case *DatabaseNotFoundError:
|
case *DatabaseNotFoundError:
|
||||||
if !i.SkipDatabaseCreation {
|
if i.SkipDatabaseCreation {
|
||||||
allErrorsAreDatabaseNotFoundErrors = false
|
continue
|
||||||
err := client.CreateDatabase(ctx, apiError.Database)
|
|
||||||
if err != nil {
|
|
||||||
i.Log.Errorf("When writing to [%s]: database %q not found and failed to recreate",
|
|
||||||
client.URL(), apiError.Database)
|
|
||||||
} else {
|
|
||||||
// try another client, if all clients fail with this error, do not return error
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// retry control
|
||||||
|
// error so the write is retried
|
||||||
|
err := client.CreateDatabase(ctx, apiError.Database)
|
||||||
|
if err != nil {
|
||||||
|
i.Log.Errorf("When writing to [%s]: database %q not found and failed to recreate",
|
||||||
|
client.URL(), apiError.Database)
|
||||||
|
} else {
|
||||||
|
return errors.New("database created; retry write")
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
allErrorsAreDatabaseNotFoundErrors = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue