chore(config): Split tests to avoid cyclic import (#13371)

This commit is contained in:
Sven Rebhan 2023-06-02 12:32:10 +02:00 committed by GitHub
parent 4a8b1473f7
commit c3ce6a8e12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 235 additions and 247 deletions

View File

@ -1109,7 +1109,7 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator
streamingProcessor := creator()
var processor interface{}
if p, ok := streamingProcessor.(unwrappable); ok {
if p, ok := streamingProcessor.(processors.HasUnwrap); ok {
processor = p.Unwrap()
} else {
processor = streamingProcessor
@ -1721,10 +1721,3 @@ func (c *Config) firstErr() error {
func (c *Config) addError(tbl *ast.Table, err error) {
c.errs = append(c.errs, fmt.Errorf("line %d:%d: %w", tbl.Line, tbl.Position, err))
}
// unwrappable lets you retrieve the original telegraf.Processor from the
// StreamingProcessor. This is necessary because the toml Unmarshaller won't
// look inside composed types.
type unwrappable interface {
Unwrap() telegraf.Processor
}

View File

@ -1,10 +1,8 @@
package config
package config_test
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
@ -21,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/persister"
@ -56,14 +55,14 @@ func TestReadBinaryFile(t *testing.T) {
err = cmd.Run()
require.NoError(t, err, fmt.Sprintf("stdout: %s, stderr: %s", outb.String(), errb.String()))
c := NewConfig()
c := config.NewConfig()
err = c.LoadConfig(binaryFile)
require.Error(t, err)
require.ErrorContains(t, err, "provided config is not a TOML file")
}
func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
t.Setenv("MY_TEST_SERVER", "192.168.1.1")
t.Setenv("TEST_INTERVAL", "10s")
require.NoError(t, c.LoadConfig("./testdata/single_plugin_env_vars.toml"))
@ -107,98 +106,8 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) {
require.Equal(t, inputConfig, c.Inputs[0].Config, "Testdata did not produce correct input metadata.")
}
func Test_envSub(t *testing.T) {
tests := []struct {
name string
setEnv func(*testing.T)
contents string
expected string
wantErr bool
errSubstring string
}{
{
name: "Legacy with ${} and without {}",
setEnv: func(t *testing.T) {
t.Setenv("TEST_ENV1", "VALUE1")
t.Setenv("TEST_ENV2", "VALUE2")
},
contents: "A string with ${TEST_ENV1}, $TEST_ENV2 and $TEST_ENV1 as repeated",
expected: "A string with VALUE1, VALUE2 and VALUE1 as repeated",
},
{
name: "Env not set",
contents: "Env variable ${NOT_SET} will be empty",
expected: "Env variable will be empty", // Two spaces present
},
{
name: "Env not set, fallback to default",
contents: "Env variable ${THIS_IS_ABSENT:-Fallback}",
expected: "Env variable Fallback",
},
{
name: "No fallback",
setEnv: func(t *testing.T) {
t.Setenv("MY_ENV1", "VALUE1")
},
contents: "Env variable ${MY_ENV1:-Fallback}",
expected: "Env variable VALUE1",
},
{
name: "Mix and match",
setEnv: func(t *testing.T) {
t.Setenv("MY_VAR", "VALUE")
t.Setenv("MY_VAR2", "VALUE2")
},
contents: "Env var ${MY_VAR} is set, with $MY_VAR syntax and default on this ${MY_VAR1:-Substituted}, no default on this ${MY_VAR2:-NoDefault}",
expected: "Env var VALUE is set, with VALUE syntax and default on this Substituted, no default on this VALUE2",
},
{
name: "Default has special chars",
contents: `Not recommended but supported ${MY_VAR:-Default with special chars Supported#$\"}`,
expected: `Not recommended but supported Default with special chars Supported#$\"`, // values are escaped
},
{
name: "unset error",
contents: "Contains ${THIS_IS_NOT_SET?unset-error}",
wantErr: true,
errSubstring: "unset-error",
},
{
name: "env empty error",
setEnv: func(t *testing.T) {
t.Setenv("ENV_EMPTY", "")
},
contents: "Contains ${ENV_EMPTY:?empty-error}",
wantErr: true,
errSubstring: "empty-error",
},
{
name: "Fallback as env variable",
setEnv: func(t *testing.T) {
t.Setenv("FALLBACK", "my-fallback")
},
contents: "Should output ${NOT_SET:-${FALLBACK}}",
expected: "Should output my-fallback",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setEnv != nil {
tt.setEnv(t)
}
actual, err := substituteEnvironment([]byte(tt.contents))
if tt.wantErr {
require.ErrorContains(t, err, tt.errSubstring)
return
}
require.EqualValues(t, tt.expected, string(actual))
})
}
}
func TestConfig_LoadSingleInput(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/single_plugin.toml"))
input := inputs.Inputs["memcached"]().(*MockupInputPlugin)
@ -239,9 +148,9 @@ func TestConfig_LoadSingleInput(t *testing.T) {
}
func TestConfig_LoadDirectory(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
files, err := WalkDirectory("./testdata/subconfig")
files, err := config.WalkDirectory("./testdata/subconfig")
files = append([]string{"./testdata/single_plugin.toml"}, files...)
require.NoError(t, err)
require.NoError(t, c.LoadAll(files...))
@ -367,31 +276,31 @@ func TestConfig_LoadDirectory(t *testing.T) {
}
func TestConfig_WrongCertPath(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
require.Error(t, c.LoadConfig("./testdata/wrong_cert_path.toml"))
}
func TestConfig_DefaultParser(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/default_parser.toml"))
}
func TestConfig_DefaultExecParser(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/default_parser_exec.toml"))
}
func TestConfig_LoadSpecialTypes(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/special_types.toml"))
require.Len(t, c.Inputs, 1)
input, ok := c.Inputs[0].Input.(*MockupInputPlugin)
require.True(t, ok)
// Tests telegraf duration parsing.
require.Equal(t, Duration(time.Second), input.WriteTimeout)
// Tests telegraf config.Duration parsing.
require.Equal(t, config.Duration(time.Second), input.WriteTimeout)
// Tests telegraf size parsing.
require.Equal(t, Size(1024*1024), input.MaxBodySize)
require.Equal(t, config.Size(1024*1024), input.MaxBodySize)
// Tests toml multiline basic strings on single line.
require.Equal(t, "./testdata/special_types.pem", input.TLSCert)
// Tests toml multiline basic strings on single line.
@ -460,7 +369,7 @@ func TestConfig_FieldNotDefined(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
err := c.LoadConfig(tt.filename)
require.ErrorContains(t, err, tt.expected)
})
@ -468,32 +377,22 @@ func TestConfig_FieldNotDefined(t *testing.T) {
}
func TestConfig_WrongFieldType(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
err := c.LoadConfig("./testdata/wrong_field_type.toml")
require.Error(t, err, "invalid field type")
require.Equal(
t,
"error loading config file ./testdata/wrong_field_type.toml: error parsing http_listener_v2, line 2: "+
"(config.MockupInputPlugin.Port) cannot unmarshal TOML string into int",
err.Error(),
)
require.ErrorContains(t, err, "cannot unmarshal TOML string into int")
c = NewConfig()
c = config.NewConfig()
err = c.LoadConfig("./testdata/wrong_field_type2.toml")
require.Error(t, err, "invalid field type2")
require.Equal(
t,
"error loading config file ./testdata/wrong_field_type2.toml: error parsing http_listener_v2, line 2: "+
"(config.MockupInputPlugin.Methods) cannot unmarshal TOML string into []string",
err.Error(),
)
require.ErrorContains(t, err, "cannot unmarshal TOML string into []string")
}
func TestConfig_InlineTables(t *testing.T) {
// #4098
t.Setenv("TOKEN", "test")
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/inline_table.toml"))
require.Len(t, c.Outputs, 2)
@ -506,7 +405,7 @@ func TestConfig_InlineTables(t *testing.T) {
func TestConfig_SliceComment(t *testing.T) {
t.Skipf("Skipping until #3642 is resolved")
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/slice_comment.toml"))
require.Len(t, c.Outputs, 1)
@ -518,7 +417,7 @@ func TestConfig_SliceComment(t *testing.T) {
func TestConfig_BadOrdering(t *testing.T) {
// #3444: when not using inline tables, care has to be taken so subsequent configuration
// doesn't become part of the table. This is not a bug, but TOML syntax.
c := NewConfig()
c := config.NewConfig()
err := c.LoadConfig("./testdata/non_slice_slice.toml")
require.Error(t, err, "bad ordering")
require.Equal(
@ -530,7 +429,7 @@ func TestConfig_BadOrdering(t *testing.T) {
func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) {
// #8256 Cannot use empty string as the namespace prefix
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/azure_monitor.toml"))
require.Len(t, c.Outputs, 2)
@ -542,59 +441,8 @@ func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) {
}
}
func TestConfig_URLRetries3Fails(t *testing.T) {
httpLoadConfigRetryInterval = 0 * time.Second
responseCounter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
responseCounter++
}))
defer ts.Close()
expected := fmt.Sprintf("error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL)
c := NewConfig()
err := c.LoadConfig(ts.URL)
require.Error(t, err)
require.Equal(t, expected, err.Error())
require.Equal(t, 4, responseCounter)
}
func TestConfig_URLRetries3FailsThenPasses(t *testing.T) {
httpLoadConfigRetryInterval = 0 * time.Second
responseCounter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if responseCounter <= 2 {
w.WriteHeader(http.StatusNotFound)
} else {
w.WriteHeader(http.StatusOK)
}
responseCounter++
}))
defer ts.Close()
c := NewConfig()
require.NoError(t, c.LoadConfig(ts.URL))
require.Equal(t, 4, responseCounter)
}
func TestConfig_getDefaultConfigPathFromEnvURL(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
c := NewConfig()
t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL)
configPath, err := getDefaultConfigPath()
require.NoError(t, err)
require.Equal(t, []string{ts.URL}, configPath)
err = c.LoadConfig("")
require.NoError(t, err)
}
func TestConfig_URLLikeFileName(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
err := c.LoadConfig("http:##www.example.com.conf")
require.Error(t, err)
@ -611,7 +459,7 @@ func TestConfig_URLLikeFileName(t *testing.T) {
}
func TestConfig_Filtering(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadAll("./testdata/filter_metricpass.toml"))
require.Len(t, c.Processors, 1)
@ -685,7 +533,7 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
"wavefront",
}
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/serializers_new.toml"))
require.Len(t, c.Outputs, len(formats))
@ -776,7 +624,7 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
"wavefront",
}
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/serializers_old.toml"))
require.Len(t, c.Outputs, len(formats))
@ -872,7 +720,7 @@ func TestConfig_ParserInterface(t *testing.T) {
"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf",
}
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/parsers_new.toml"))
require.Len(t, c.Inputs, len(formats))
@ -1039,7 +887,7 @@ func TestConfig_MultipleProcessorsOrder(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
filenames := make([]string, 0, len(test.filename))
for _, fn := range test.filename {
filenames = append(filenames, filepath.Join("./testdata/processor_order", fn))
@ -1078,7 +926,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) {
"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf",
}
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadAll("./testdata/processors_with_parsers.toml"))
require.Len(t, c.Processors, len(formats))
@ -1127,7 +975,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) {
generated := make([]interface{}, 0)
for _, plugin := range c.Processors {
var processorIF telegraf.Processor
if p, ok := plugin.Processor.(unwrappable); ok {
if p, ok := plugin.Processor.(processors.HasUnwrap); ok {
processorIF = p.Unwrap()
} else {
processorIF = plugin.Processor.(telegraf.Processor)
@ -1181,7 +1029,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) {
}
func TestConfigPluginIDsDifferent(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
c.Agent.Statefile = "/dev/null"
require.NoError(t, c.LoadConfig("./testdata/state_persistence_input_all_different.toml"))
require.NotEmpty(t, c.Inputs)
@ -1204,7 +1052,7 @@ func TestConfigPluginIDsDifferent(t *testing.T) {
}
func TestConfigPluginIDsSame(t *testing.T) {
c := NewConfig()
c := config.NewConfig()
c.Agent.Statefile = "/dev/null"
require.NoError(t, c.LoadConfig("./testdata/state_persistence_input_all_same.toml"))
require.NotEmpty(t, c.Inputs)
@ -1231,7 +1079,7 @@ func TestPersisterInputStoreLoad(t *testing.T) {
defer os.Remove(filename)
// Load the plugins
cstore := NewConfig()
cstore := config.NewConfig()
require.NoError(t, cstore.LoadConfig("testdata/state_persistence_input_store_load.toml"))
// Initialize the persister for storing the state
@ -1263,7 +1111,7 @@ func TestPersisterInputStoreLoad(t *testing.T) {
require.NoError(t, persisterStore.Store())
// Load the plugins
cload := NewConfig()
cload := config.NewConfig()
require.NoError(t, cload.LoadConfig("testdata/state_persistence_input_store_load.toml"))
require.Len(t, cload.Inputs, len(expected))
@ -1297,7 +1145,7 @@ func TestPersisterInputStoreLoad(t *testing.T) {
func TestPersisterProcessorRegistration(t *testing.T) {
// Load the plugins
c := NewConfig()
c := config.NewConfig()
require.NoError(t, c.LoadConfig("testdata/state_persistence_processors.toml"))
require.NotEmpty(t, c.Processors)
require.NotEmpty(t, c.AggProcessors)
@ -1310,7 +1158,7 @@ func TestPersisterProcessorRegistration(t *testing.T) {
// Register the processors
for _, plugin := range c.Processors {
unwrapped := plugin.Processor.(unwrappable).Unwrap()
unwrapped := plugin.Processor.(processors.HasUnwrap).Unwrap()
p := unwrapped.(*MockupProcessorPlugin)
require.NoError(t, dut.Register(plugin.ID(), p))
@ -1318,7 +1166,7 @@ func TestPersisterProcessorRegistration(t *testing.T) {
// Register the after-aggregator processors
for _, plugin := range c.AggProcessors {
unwrapped := plugin.Processor.(unwrappable).Unwrap()
unwrapped := plugin.Processor.(processors.HasUnwrap).Unwrap()
p := unwrapped.(*MockupProcessorPlugin)
require.NoError(t, dut.Register(plugin.ID(), p))
@ -1346,14 +1194,14 @@ func (m *MockupInputPluginParserNew) SetParserFunc(f telegraf.ParserFunc) {
/*** Mockup INPUT plugin for testing to avoid cyclic dependencies ***/
type MockupInputPlugin struct {
Servers []string `toml:"servers"`
Methods []string `toml:"methods"`
Timeout Duration `toml:"timeout"`
ReadTimeout Duration `toml:"read_timeout"`
WriteTimeout Duration `toml:"write_timeout"`
MaxBodySize Size `toml:"max_body_size"`
Paths []string `toml:"paths"`
Port int `toml:"port"`
Servers []string `toml:"servers"`
Methods []string `toml:"methods"`
Timeout config.Duration `toml:"timeout"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Paths []string `toml:"paths"`
Port int `toml:"port"`
Command string
Files []string
PidFile string
@ -1646,7 +1494,7 @@ func init() {
return &MockupInputPluginParserFunc{}
})
inputs.Add("exec", func() telegraf.Input {
return &MockupInputPlugin{Timeout: Duration(time.Second * 5)}
return &MockupInputPlugin{Timeout: config.Duration(time.Second * 5)}
})
inputs.Add("file", func() telegraf.Input {
return &MockupInputPlugin{}

152
config/internal_test.go Normal file
View File

@ -0,0 +1,152 @@
package config
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestEnvironmentSubstitution(t *testing.T) {
tests := []struct {
name string
setEnv func(*testing.T)
contents string
expected string
wantErr bool
errSubstring string
}{
{
name: "Legacy with ${} and without {}",
setEnv: func(t *testing.T) {
t.Setenv("TEST_ENV1", "VALUE1")
t.Setenv("TEST_ENV2", "VALUE2")
},
contents: "A string with ${TEST_ENV1}, $TEST_ENV2 and $TEST_ENV1 as repeated",
expected: "A string with VALUE1, VALUE2 and VALUE1 as repeated",
},
{
name: "Env not set",
contents: "Env variable ${NOT_SET} will be empty",
expected: "Env variable will be empty", // Two spaces present
},
{
name: "Env not set, fallback to default",
contents: "Env variable ${THIS_IS_ABSENT:-Fallback}",
expected: "Env variable Fallback",
},
{
name: "No fallback",
setEnv: func(t *testing.T) {
t.Setenv("MY_ENV1", "VALUE1")
},
contents: "Env variable ${MY_ENV1:-Fallback}",
expected: "Env variable VALUE1",
},
{
name: "Mix and match",
setEnv: func(t *testing.T) {
t.Setenv("MY_VAR", "VALUE")
t.Setenv("MY_VAR2", "VALUE2")
},
contents: "Env var ${MY_VAR} is set, with $MY_VAR syntax and default on this ${MY_VAR1:-Substituted}, no default on this ${MY_VAR2:-NoDefault}",
expected: "Env var VALUE is set, with VALUE syntax and default on this Substituted, no default on this VALUE2",
},
{
name: "Default has special chars",
contents: `Not recommended but supported ${MY_VAR:-Default with special chars Supported#$\"}`,
expected: `Not recommended but supported Default with special chars Supported#$\"`, // values are escaped
},
{
name: "unset error",
contents: "Contains ${THIS_IS_NOT_SET?unset-error}",
wantErr: true,
errSubstring: "unset-error",
},
{
name: "env empty error",
setEnv: func(t *testing.T) {
t.Setenv("ENV_EMPTY", "")
},
contents: "Contains ${ENV_EMPTY:?empty-error}",
wantErr: true,
errSubstring: "empty-error",
},
{
name: "Fallback as env variable",
setEnv: func(t *testing.T) {
t.Setenv("FALLBACK", "my-fallback")
},
contents: "Should output ${NOT_SET:-${FALLBACK}}",
expected: "Should output my-fallback",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.setEnv != nil {
tt.setEnv(t)
}
actual, err := substituteEnvironment([]byte(tt.contents))
if tt.wantErr {
require.ErrorContains(t, err, tt.errSubstring)
return
}
require.EqualValues(t, tt.expected, string(actual))
})
}
}
func TestURLRetries3Fails(t *testing.T) {
httpLoadConfigRetryInterval = 0 * time.Second
responseCounter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
responseCounter++
}))
defer ts.Close()
expected := fmt.Sprintf("error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL)
c := NewConfig()
err := c.LoadConfig(ts.URL)
require.Error(t, err)
require.Equal(t, expected, err.Error())
require.Equal(t, 4, responseCounter)
}
func TestURLRetries3FailsThenPasses(t *testing.T) {
httpLoadConfigRetryInterval = 0 * time.Second
responseCounter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if responseCounter <= 2 {
w.WriteHeader(http.StatusNotFound)
} else {
w.WriteHeader(http.StatusOK)
}
responseCounter++
}))
defer ts.Close()
c := NewConfig()
require.NoError(t, c.LoadConfig(ts.URL))
require.Equal(t, 4, responseCounter)
}
func TestConfig_getDefaultConfigPathFromEnvURL(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
c := NewConfig()
t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL)
configPath, err := getDefaultConfigPath()
require.NoError(t, err)
require.Equal(t, []string{ts.URL}, configPath)
err = c.LoadConfig("")
require.NoError(t, err)
}

View File

@ -112,7 +112,7 @@ func createPluginsWithTomlConfig(md toml.MetaData, conf config) (loadedConfig, e
if len(primitives) > 0 {
primitive := primitives[0]
var p telegraf.PluginDescriber = plugin
if processor, ok := plugin.(unwrappable); ok {
if processor, ok := plugin.(processors.HasUnwrap); ok {
p = processor.Unwrap()
}
if err := md.PrimitiveDecode(primitive, p); err != nil {
@ -168,7 +168,3 @@ func DefaultImportedPlugins() config {
}
return conf
}
type unwrappable interface {
Unwrap() telegraf.Processor
}

View File

@ -249,7 +249,7 @@ func (adx *AzureDataExplorer) Init() error {
}
serializer := &json.Serializer{
TimestampUnits: time.Nanosecond,
TimestampUnits: config.Duration(time.Nanosecond),
TimestampFormat: time.RFC3339Nano,
}
if err := serializer.Init(); err != nil {

View File

@ -105,9 +105,7 @@ func TestWrite(t *testing.T) {
for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
serializer := &telegrafJson.Serializer{
TimestampUnits: time.Second,
}
serializer := &telegrafJson.Serializer{}
require.NoError(t, serializer.Init())
ingestionType := "queued"
@ -158,9 +156,7 @@ func TestWrite(t *testing.T) {
}
func TestCreateAzureDataExplorerTable(t *testing.T) {
serializer := &telegrafJson.Serializer{
TimestampUnits: time.Second,
}
serializer := &telegrafJson.Serializer{}
require.NoError(t, serializer.Init())
plugin := AzureDataExplorer{
Endpoint: "someendpoint",
@ -255,9 +251,7 @@ func TestWriteWithType(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
serializer := &telegrafJson.Serializer{
TimestampUnits: time.Second,
}
serializer := &telegrafJson.Serializer{}
require.NoError(t, serializer.Init())
for tableName, jsonValue := range testCase.tableNameToExpectedResult {
ingestionType := "queued"

View File

@ -42,9 +42,7 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt
/* End wrapper interface */
func TestInitAndWrite(t *testing.T) {
serializer := &json.Serializer{
TimestampUnits: time.Second,
}
serializer := &json.Serializer{}
require.NoError(t, serializer.Init())
mockHub := &mockEventHub{}
@ -103,9 +101,7 @@ func TestInitAndWriteIntegration(t *testing.T) {
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
// Configure the plugin to target the newly created hub
serializer := &json.Serializer{
TimestampUnits: time.Second,
}
serializer := &json.Serializer{}
require.NoError(t, serializer.Init())
e := &EventHubs{
Hub: &eventHub{},

View File

@ -666,7 +666,7 @@ func TestBatchedUnbatched(t *testing.T) {
influxSerializer := &influx.Serializer{}
require.NoError(t, influxSerializer.Init())
jsonSerializer := &json.Serializer{TimestampUnits: time.Second}
jsonSerializer := &json.Serializer{}
require.NoError(t, jsonSerializer.Init())
s := map[string]serializers.Serializer{

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/testutil"
)
@ -30,7 +31,7 @@ func TestConnectAndWrite(t *testing.T) {
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s := &json.Serializer{
TimestampUnits: 10 * time.Second,
TimestampUnits: config.Duration(10 * time.Second),
TimestampFormat: "yyy-dd-mmThh:mm:ss",
}
require.NoError(t, s.Init())

View File

@ -84,8 +84,7 @@ func TestCases(t *testing.T) {
require.NoError(t, cfg.LoadConfig(configFilename))
require.Len(t, cfg.Processors, 1, "wrong number of processors")
type unwrappable interface{ Unwrap() telegraf.Processor }
proc := cfg.Processors[0].Processor.(unwrappable)
proc := cfg.Processors[0].Processor.(processors.HasUnwrap)
plugin := proc.Unwrap().(*Processor)
require.NoError(t, plugin.Init())
@ -154,8 +153,7 @@ func TestCasesTracking(t *testing.T) {
require.NoError(t, cfg.LoadConfig(configFilename))
require.Len(t, cfg.Processors, 1, "wrong number of processors")
type unwrappableProcessor interface{ Unwrap() telegraf.Processor }
proc := cfg.Processors[0].Processor.(unwrappableProcessor)
proc := cfg.Processors[0].Processor.(processors.HasUnwrap)
plugin := proc.Unwrap().(*Processor)
require.NoError(t, plugin.Init())

View File

@ -5,6 +5,12 @@ import "github.com/influxdata/telegraf"
type Creator func() telegraf.Processor
type StreamingCreator func() telegraf.StreamingProcessor
// HasUnwrap indicates the presence of an Unwrap() function to retrieve the
// underlying telegraf.Processor.
type HasUnwrap interface {
Unwrap() telegraf.Processor
}
// all processors are streaming processors.
// telegraf.Processor processors are upgraded to telegraf.StreamingProcessor
var Processors = map[string]StreamingCreator{}

View File

@ -10,16 +10,17 @@ import (
"github.com/blues/jsonata-go"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/serializers"
)
type Serializer struct {
TimestampUnits time.Duration `toml:"json_timestamp_units"`
TimestampFormat string `toml:"json_timestamp_format"`
Transformation string `toml:"json_transformation"`
NestedFieldsInclude []string `toml:"json_nested_fields_include"`
NestedFieldsExclude []string `toml:"json_nested_fields_exclude"`
TimestampUnits config.Duration `toml:"json_timestamp_units"`
TimestampFormat string `toml:"json_timestamp_format"`
Transformation string `toml:"json_transformation"`
NestedFieldsInclude []string `toml:"json_nested_fields_include"`
NestedFieldsExclude []string `toml:"json_nested_fields_exclude"`
nestedfields filter.Filter
}
@ -27,18 +28,20 @@ type Serializer struct {
func (s *Serializer) Init() error {
// Default precision is 1s
if s.TimestampUnits <= 0 {
s.TimestampUnits = time.Second
s.TimestampUnits = config.Duration(time.Second)
}
// Search for the power of ten less than the duration
d := time.Nanosecond
t := time.Duration(s.TimestampUnits)
for {
if d*10 > s.TimestampUnits {
s.TimestampUnits = d
if d*10 > t {
t = d
break
}
d = d * 10
}
s.TimestampUnits = config.Duration(t)
if len(s.NestedFieldsInclude) > 0 || len(s.NestedFieldsExclude) > 0 {
f, err := filter.NewIncludeExcludeFilter(s.NestedFieldsInclude, s.NestedFieldsExclude)
@ -165,7 +168,7 @@ func init() {
// InitFromConfig is a compatibility function to construct the parser the old way
func (s *Serializer) InitFromConfig(cfg *serializers.Config) error {
s.TimestampUnits = cfg.TimestampUnits
s.TimestampUnits = config.Duration(cfg.TimestampUnits)
s.TimestampFormat = cfg.TimestampFormat
s.Transformation = cfg.Transformation
s.NestedFieldsInclude = cfg.JSONNestedFieldInclude

View File

@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
@ -91,7 +92,7 @@ func TestSerialize_TimestampUnits(t *testing.T) {
time.Unix(1525478795, 123456789),
)
s := Serializer{
TimestampUnits: tt.timestampUnits,
TimestampUnits: config.Duration(tt.timestampUnits),
TimestampFormat: tt.timestampFormat,
}
require.NoError(t, s.Init())
@ -270,7 +271,7 @@ func TestSerializeTransformationNonBatch(t *testing.T) {
// Serialize
serializer := Serializer{
TimestampUnits: cfg.TimestampUnits,
TimestampUnits: config.Duration(cfg.TimestampUnits),
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
}
@ -318,7 +319,7 @@ func TestSerializeTransformationBatch(t *testing.T) {
// Serialize
serializer := Serializer{
TimestampUnits: cfg.TimestampUnits,
TimestampUnits: config.Duration(cfg.TimestampUnits),
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
}
@ -432,7 +433,7 @@ func TestSerializeNesting(t *testing.T) {
// Serialize
serializer := Serializer{
TimestampUnits: cfg.TimestampUnits,
TimestampUnits: config.Duration(cfg.TimestampUnits),
TimestampFormat: cfg.TimestampFormat,
Transformation: cfg.Transformation,
NestedFieldsInclude: cfg.JSONNestedFieldsInclude,