From c3ce6a8e12972f508d39323033753df8adbfe7d9 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Fri, 2 Jun 2023 12:32:10 +0200 Subject: [PATCH] chore(config): Split tests to avoid cyclic import (#13371) --- config/config.go | 9 +- config/config_test.go | 246 ++++-------------- config/internal_test.go | 152 +++++++++++ plugins/common/shim/config.go | 6 +- .../azure_data_explorer.go | 2 +- .../azure_data_explorer_test.go | 12 +- plugins/outputs/event_hubs/event_hubs_test.go | 8 +- plugins/outputs/http/http_test.go | 2 +- plugins/outputs/stomp/stomp_test.go | 3 +- plugins/processors/lookup/lookup_test.go | 6 +- plugins/processors/registry.go | 6 + plugins/serializers/json/json.go | 21 +- plugins/serializers/json/json_test.go | 9 +- 13 files changed, 235 insertions(+), 247 deletions(-) create mode 100644 config/internal_test.go diff --git a/config/config.go b/config/config.go index 0289cb835..ab00d32f3 100644 --- a/config/config.go +++ b/config/config.go @@ -1109,7 +1109,7 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator streamingProcessor := creator() var processor interface{} - if p, ok := streamingProcessor.(unwrappable); ok { + if p, ok := streamingProcessor.(processors.HasUnwrap); ok { processor = p.Unwrap() } else { processor = streamingProcessor @@ -1721,10 +1721,3 @@ func (c *Config) firstErr() error { func (c *Config) addError(tbl *ast.Table, err error) { c.errs = append(c.errs, fmt.Errorf("line %d:%d: %w", tbl.Line, tbl.Position, err)) } - -// unwrappable lets you retrieve the original telegraf.Processor from the -// StreamingProcessor. This is necessary because the toml Unmarshaller won't -// look inside composed types. -type unwrappable interface { - Unwrap() telegraf.Processor -} diff --git a/config/config_test.go b/config/config_test.go index a6c5de9fc..6937ee502 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1,10 +1,8 @@ -package config +package config_test import ( "bytes" "fmt" - "net/http" - "net/http/httptest" "os" "os/exec" "path/filepath" @@ -21,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/persister" @@ -56,14 +55,14 @@ func TestReadBinaryFile(t *testing.T) { err = cmd.Run() require.NoError(t, err, fmt.Sprintf("stdout: %s, stderr: %s", outb.String(), errb.String())) - c := NewConfig() + c := config.NewConfig() err = c.LoadConfig(binaryFile) require.Error(t, err) require.ErrorContains(t, err, "provided config is not a TOML file") } func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) { - c := NewConfig() + c := config.NewConfig() t.Setenv("MY_TEST_SERVER", "192.168.1.1") t.Setenv("TEST_INTERVAL", "10s") require.NoError(t, c.LoadConfig("./testdata/single_plugin_env_vars.toml")) @@ -107,98 +106,8 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) { require.Equal(t, inputConfig, c.Inputs[0].Config, "Testdata did not produce correct input metadata.") } -func Test_envSub(t *testing.T) { - tests := []struct { - name string - setEnv func(*testing.T) - contents string - expected string - wantErr bool - errSubstring string - }{ - { - name: "Legacy with ${} and without {}", - setEnv: func(t *testing.T) { - t.Setenv("TEST_ENV1", "VALUE1") - t.Setenv("TEST_ENV2", "VALUE2") - }, - contents: "A string with ${TEST_ENV1}, $TEST_ENV2 and $TEST_ENV1 as repeated", - expected: "A string with VALUE1, VALUE2 and VALUE1 as repeated", - }, - { - name: "Env not set", - contents: "Env variable ${NOT_SET} will be empty", - expected: "Env variable will be empty", // Two spaces present - }, - { - name: "Env not set, fallback to default", - contents: "Env variable ${THIS_IS_ABSENT:-Fallback}", - expected: "Env variable Fallback", - }, - { - name: "No fallback", - setEnv: func(t *testing.T) { - t.Setenv("MY_ENV1", "VALUE1") - }, - contents: "Env variable ${MY_ENV1:-Fallback}", - expected: "Env variable VALUE1", - }, - { - name: "Mix and match", - setEnv: func(t *testing.T) { - t.Setenv("MY_VAR", "VALUE") - t.Setenv("MY_VAR2", "VALUE2") - }, - contents: "Env var ${MY_VAR} is set, with $MY_VAR syntax and default on this ${MY_VAR1:-Substituted}, no default on this ${MY_VAR2:-NoDefault}", - expected: "Env var VALUE is set, with VALUE syntax and default on this Substituted, no default on this VALUE2", - }, - { - name: "Default has special chars", - contents: `Not recommended but supported ${MY_VAR:-Default with special chars Supported#$\"}`, - expected: `Not recommended but supported Default with special chars Supported#$\"`, // values are escaped - }, - { - name: "unset error", - contents: "Contains ${THIS_IS_NOT_SET?unset-error}", - wantErr: true, - errSubstring: "unset-error", - }, - { - name: "env empty error", - setEnv: func(t *testing.T) { - t.Setenv("ENV_EMPTY", "") - }, - contents: "Contains ${ENV_EMPTY:?empty-error}", - wantErr: true, - errSubstring: "empty-error", - }, - { - name: "Fallback as env variable", - setEnv: func(t *testing.T) { - t.Setenv("FALLBACK", "my-fallback") - }, - contents: "Should output ${NOT_SET:-${FALLBACK}}", - expected: "Should output my-fallback", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.setEnv != nil { - tt.setEnv(t) - } - actual, err := substituteEnvironment([]byte(tt.contents)) - if tt.wantErr { - require.ErrorContains(t, err, tt.errSubstring) - return - } - require.EqualValues(t, tt.expected, string(actual)) - }) - } -} - func TestConfig_LoadSingleInput(t *testing.T) { - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/single_plugin.toml")) input := inputs.Inputs["memcached"]().(*MockupInputPlugin) @@ -239,9 +148,9 @@ func TestConfig_LoadSingleInput(t *testing.T) { } func TestConfig_LoadDirectory(t *testing.T) { - c := NewConfig() + c := config.NewConfig() - files, err := WalkDirectory("./testdata/subconfig") + files, err := config.WalkDirectory("./testdata/subconfig") files = append([]string{"./testdata/single_plugin.toml"}, files...) require.NoError(t, err) require.NoError(t, c.LoadAll(files...)) @@ -367,31 +276,31 @@ func TestConfig_LoadDirectory(t *testing.T) { } func TestConfig_WrongCertPath(t *testing.T) { - c := NewConfig() + c := config.NewConfig() require.Error(t, c.LoadConfig("./testdata/wrong_cert_path.toml")) } func TestConfig_DefaultParser(t *testing.T) { - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/default_parser.toml")) } func TestConfig_DefaultExecParser(t *testing.T) { - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/default_parser_exec.toml")) } func TestConfig_LoadSpecialTypes(t *testing.T) { - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/special_types.toml")) require.Len(t, c.Inputs, 1) input, ok := c.Inputs[0].Input.(*MockupInputPlugin) require.True(t, ok) - // Tests telegraf duration parsing. - require.Equal(t, Duration(time.Second), input.WriteTimeout) + // Tests telegraf config.Duration parsing. + require.Equal(t, config.Duration(time.Second), input.WriteTimeout) // Tests telegraf size parsing. - require.Equal(t, Size(1024*1024), input.MaxBodySize) + require.Equal(t, config.Size(1024*1024), input.MaxBodySize) // Tests toml multiline basic strings on single line. require.Equal(t, "./testdata/special_types.pem", input.TLSCert) // Tests toml multiline basic strings on single line. @@ -460,7 +369,7 @@ func TestConfig_FieldNotDefined(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := NewConfig() + c := config.NewConfig() err := c.LoadConfig(tt.filename) require.ErrorContains(t, err, tt.expected) }) @@ -468,32 +377,22 @@ func TestConfig_FieldNotDefined(t *testing.T) { } func TestConfig_WrongFieldType(t *testing.T) { - c := NewConfig() + c := config.NewConfig() err := c.LoadConfig("./testdata/wrong_field_type.toml") require.Error(t, err, "invalid field type") - require.Equal( - t, - "error loading config file ./testdata/wrong_field_type.toml: error parsing http_listener_v2, line 2: "+ - "(config.MockupInputPlugin.Port) cannot unmarshal TOML string into int", - err.Error(), - ) + require.ErrorContains(t, err, "cannot unmarshal TOML string into int") - c = NewConfig() + c = config.NewConfig() err = c.LoadConfig("./testdata/wrong_field_type2.toml") require.Error(t, err, "invalid field type2") - require.Equal( - t, - "error loading config file ./testdata/wrong_field_type2.toml: error parsing http_listener_v2, line 2: "+ - "(config.MockupInputPlugin.Methods) cannot unmarshal TOML string into []string", - err.Error(), - ) + require.ErrorContains(t, err, "cannot unmarshal TOML string into []string") } func TestConfig_InlineTables(t *testing.T) { // #4098 t.Setenv("TOKEN", "test") - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/inline_table.toml")) require.Len(t, c.Outputs, 2) @@ -506,7 +405,7 @@ func TestConfig_InlineTables(t *testing.T) { func TestConfig_SliceComment(t *testing.T) { t.Skipf("Skipping until #3642 is resolved") - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/slice_comment.toml")) require.Len(t, c.Outputs, 1) @@ -518,7 +417,7 @@ func TestConfig_SliceComment(t *testing.T) { func TestConfig_BadOrdering(t *testing.T) { // #3444: when not using inline tables, care has to be taken so subsequent configuration // doesn't become part of the table. This is not a bug, but TOML syntax. - c := NewConfig() + c := config.NewConfig() err := c.LoadConfig("./testdata/non_slice_slice.toml") require.Error(t, err, "bad ordering") require.Equal( @@ -530,7 +429,7 @@ func TestConfig_BadOrdering(t *testing.T) { func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) { // #8256 Cannot use empty string as the namespace prefix - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/azure_monitor.toml")) require.Len(t, c.Outputs, 2) @@ -542,59 +441,8 @@ func TestConfig_AzureMonitorNamespacePrefix(t *testing.T) { } } -func TestConfig_URLRetries3Fails(t *testing.T) { - httpLoadConfigRetryInterval = 0 * time.Second - responseCounter := 0 - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - responseCounter++ - })) - defer ts.Close() - - expected := fmt.Sprintf("error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL) - - c := NewConfig() - err := c.LoadConfig(ts.URL) - require.Error(t, err) - require.Equal(t, expected, err.Error()) - require.Equal(t, 4, responseCounter) -} - -func TestConfig_URLRetries3FailsThenPasses(t *testing.T) { - httpLoadConfigRetryInterval = 0 * time.Second - responseCounter := 0 - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if responseCounter <= 2 { - w.WriteHeader(http.StatusNotFound) - } else { - w.WriteHeader(http.StatusOK) - } - responseCounter++ - })) - defer ts.Close() - - c := NewConfig() - require.NoError(t, c.LoadConfig(ts.URL)) - require.Equal(t, 4, responseCounter) -} - -func TestConfig_getDefaultConfigPathFromEnvURL(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - })) - defer ts.Close() - - c := NewConfig() - t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL) - configPath, err := getDefaultConfigPath() - require.NoError(t, err) - require.Equal(t, []string{ts.URL}, configPath) - err = c.LoadConfig("") - require.NoError(t, err) -} - func TestConfig_URLLikeFileName(t *testing.T) { - c := NewConfig() + c := config.NewConfig() err := c.LoadConfig("http:##www.example.com.conf") require.Error(t, err) @@ -611,7 +459,7 @@ func TestConfig_URLLikeFileName(t *testing.T) { } func TestConfig_Filtering(t *testing.T) { - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadAll("./testdata/filter_metricpass.toml")) require.Len(t, c.Processors, 1) @@ -685,7 +533,7 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) { "wavefront", } - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/serializers_new.toml")) require.Len(t, c.Outputs, len(formats)) @@ -776,7 +624,7 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) { "wavefront", } - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/serializers_old.toml")) require.Len(t, c.Outputs, len(formats)) @@ -872,7 +720,7 @@ func TestConfig_ParserInterface(t *testing.T) { "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf", } - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/parsers_new.toml")) require.Len(t, c.Inputs, len(formats)) @@ -1039,7 +887,7 @@ func TestConfig_MultipleProcessorsOrder(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c := NewConfig() + c := config.NewConfig() filenames := make([]string, 0, len(test.filename)) for _, fn := range test.filename { filenames = append(filenames, filepath.Join("./testdata/processor_order", fn)) @@ -1078,7 +926,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) { "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf", } - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadAll("./testdata/processors_with_parsers.toml")) require.Len(t, c.Processors, len(formats)) @@ -1127,7 +975,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) { generated := make([]interface{}, 0) for _, plugin := range c.Processors { var processorIF telegraf.Processor - if p, ok := plugin.Processor.(unwrappable); ok { + if p, ok := plugin.Processor.(processors.HasUnwrap); ok { processorIF = p.Unwrap() } else { processorIF = plugin.Processor.(telegraf.Processor) @@ -1181,7 +1029,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) { } func TestConfigPluginIDsDifferent(t *testing.T) { - c := NewConfig() + c := config.NewConfig() c.Agent.Statefile = "/dev/null" require.NoError(t, c.LoadConfig("./testdata/state_persistence_input_all_different.toml")) require.NotEmpty(t, c.Inputs) @@ -1204,7 +1052,7 @@ func TestConfigPluginIDsDifferent(t *testing.T) { } func TestConfigPluginIDsSame(t *testing.T) { - c := NewConfig() + c := config.NewConfig() c.Agent.Statefile = "/dev/null" require.NoError(t, c.LoadConfig("./testdata/state_persistence_input_all_same.toml")) require.NotEmpty(t, c.Inputs) @@ -1231,7 +1079,7 @@ func TestPersisterInputStoreLoad(t *testing.T) { defer os.Remove(filename) // Load the plugins - cstore := NewConfig() + cstore := config.NewConfig() require.NoError(t, cstore.LoadConfig("testdata/state_persistence_input_store_load.toml")) // Initialize the persister for storing the state @@ -1263,7 +1111,7 @@ func TestPersisterInputStoreLoad(t *testing.T) { require.NoError(t, persisterStore.Store()) // Load the plugins - cload := NewConfig() + cload := config.NewConfig() require.NoError(t, cload.LoadConfig("testdata/state_persistence_input_store_load.toml")) require.Len(t, cload.Inputs, len(expected)) @@ -1297,7 +1145,7 @@ func TestPersisterInputStoreLoad(t *testing.T) { func TestPersisterProcessorRegistration(t *testing.T) { // Load the plugins - c := NewConfig() + c := config.NewConfig() require.NoError(t, c.LoadConfig("testdata/state_persistence_processors.toml")) require.NotEmpty(t, c.Processors) require.NotEmpty(t, c.AggProcessors) @@ -1310,7 +1158,7 @@ func TestPersisterProcessorRegistration(t *testing.T) { // Register the processors for _, plugin := range c.Processors { - unwrapped := plugin.Processor.(unwrappable).Unwrap() + unwrapped := plugin.Processor.(processors.HasUnwrap).Unwrap() p := unwrapped.(*MockupProcessorPlugin) require.NoError(t, dut.Register(plugin.ID(), p)) @@ -1318,7 +1166,7 @@ func TestPersisterProcessorRegistration(t *testing.T) { // Register the after-aggregator processors for _, plugin := range c.AggProcessors { - unwrapped := plugin.Processor.(unwrappable).Unwrap() + unwrapped := plugin.Processor.(processors.HasUnwrap).Unwrap() p := unwrapped.(*MockupProcessorPlugin) require.NoError(t, dut.Register(plugin.ID(), p)) @@ -1346,14 +1194,14 @@ func (m *MockupInputPluginParserNew) SetParserFunc(f telegraf.ParserFunc) { /*** Mockup INPUT plugin for testing to avoid cyclic dependencies ***/ type MockupInputPlugin struct { - Servers []string `toml:"servers"` - Methods []string `toml:"methods"` - Timeout Duration `toml:"timeout"` - ReadTimeout Duration `toml:"read_timeout"` - WriteTimeout Duration `toml:"write_timeout"` - MaxBodySize Size `toml:"max_body_size"` - Paths []string `toml:"paths"` - Port int `toml:"port"` + Servers []string `toml:"servers"` + Methods []string `toml:"methods"` + Timeout config.Duration `toml:"timeout"` + ReadTimeout config.Duration `toml:"read_timeout"` + WriteTimeout config.Duration `toml:"write_timeout"` + MaxBodySize config.Size `toml:"max_body_size"` + Paths []string `toml:"paths"` + Port int `toml:"port"` Command string Files []string PidFile string @@ -1646,7 +1494,7 @@ func init() { return &MockupInputPluginParserFunc{} }) inputs.Add("exec", func() telegraf.Input { - return &MockupInputPlugin{Timeout: Duration(time.Second * 5)} + return &MockupInputPlugin{Timeout: config.Duration(time.Second * 5)} }) inputs.Add("file", func() telegraf.Input { return &MockupInputPlugin{} diff --git a/config/internal_test.go b/config/internal_test.go new file mode 100644 index 000000000..b6e943848 --- /dev/null +++ b/config/internal_test.go @@ -0,0 +1,152 @@ +package config + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEnvironmentSubstitution(t *testing.T) { + tests := []struct { + name string + setEnv func(*testing.T) + contents string + expected string + wantErr bool + errSubstring string + }{ + { + name: "Legacy with ${} and without {}", + setEnv: func(t *testing.T) { + t.Setenv("TEST_ENV1", "VALUE1") + t.Setenv("TEST_ENV2", "VALUE2") + }, + contents: "A string with ${TEST_ENV1}, $TEST_ENV2 and $TEST_ENV1 as repeated", + expected: "A string with VALUE1, VALUE2 and VALUE1 as repeated", + }, + { + name: "Env not set", + contents: "Env variable ${NOT_SET} will be empty", + expected: "Env variable will be empty", // Two spaces present + }, + { + name: "Env not set, fallback to default", + contents: "Env variable ${THIS_IS_ABSENT:-Fallback}", + expected: "Env variable Fallback", + }, + { + name: "No fallback", + setEnv: func(t *testing.T) { + t.Setenv("MY_ENV1", "VALUE1") + }, + contents: "Env variable ${MY_ENV1:-Fallback}", + expected: "Env variable VALUE1", + }, + { + name: "Mix and match", + setEnv: func(t *testing.T) { + t.Setenv("MY_VAR", "VALUE") + t.Setenv("MY_VAR2", "VALUE2") + }, + contents: "Env var ${MY_VAR} is set, with $MY_VAR syntax and default on this ${MY_VAR1:-Substituted}, no default on this ${MY_VAR2:-NoDefault}", + expected: "Env var VALUE is set, with VALUE syntax and default on this Substituted, no default on this VALUE2", + }, + { + name: "Default has special chars", + contents: `Not recommended but supported ${MY_VAR:-Default with special chars Supported#$\"}`, + expected: `Not recommended but supported Default with special chars Supported#$\"`, // values are escaped + }, + { + name: "unset error", + contents: "Contains ${THIS_IS_NOT_SET?unset-error}", + wantErr: true, + errSubstring: "unset-error", + }, + { + name: "env empty error", + setEnv: func(t *testing.T) { + t.Setenv("ENV_EMPTY", "") + }, + contents: "Contains ${ENV_EMPTY:?empty-error}", + wantErr: true, + errSubstring: "empty-error", + }, + { + name: "Fallback as env variable", + setEnv: func(t *testing.T) { + t.Setenv("FALLBACK", "my-fallback") + }, + contents: "Should output ${NOT_SET:-${FALLBACK}}", + expected: "Should output my-fallback", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.setEnv != nil { + tt.setEnv(t) + } + actual, err := substituteEnvironment([]byte(tt.contents)) + if tt.wantErr { + require.ErrorContains(t, err, tt.errSubstring) + return + } + require.EqualValues(t, tt.expected, string(actual)) + }) + } +} + +func TestURLRetries3Fails(t *testing.T) { + httpLoadConfigRetryInterval = 0 * time.Second + responseCounter := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + responseCounter++ + })) + defer ts.Close() + + expected := fmt.Sprintf("error loading config file %s: retry 3 of 3 failed to retrieve remote config: 404 Not Found", ts.URL) + + c := NewConfig() + err := c.LoadConfig(ts.URL) + require.Error(t, err) + require.Equal(t, expected, err.Error()) + require.Equal(t, 4, responseCounter) +} + +func TestURLRetries3FailsThenPasses(t *testing.T) { + httpLoadConfigRetryInterval = 0 * time.Second + responseCounter := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if responseCounter <= 2 { + w.WriteHeader(http.StatusNotFound) + } else { + w.WriteHeader(http.StatusOK) + } + responseCounter++ + })) + defer ts.Close() + + c := NewConfig() + require.NoError(t, c.LoadConfig(ts.URL)) + require.Equal(t, 4, responseCounter) +} + +func TestConfig_getDefaultConfigPathFromEnvURL(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + c := NewConfig() + t.Setenv("TELEGRAF_CONFIG_PATH", ts.URL) + configPath, err := getDefaultConfigPath() + require.NoError(t, err) + require.Equal(t, []string{ts.URL}, configPath) + err = c.LoadConfig("") + require.NoError(t, err) +} diff --git a/plugins/common/shim/config.go b/plugins/common/shim/config.go index bd676d890..f27d7deb5 100644 --- a/plugins/common/shim/config.go +++ b/plugins/common/shim/config.go @@ -112,7 +112,7 @@ func createPluginsWithTomlConfig(md toml.MetaData, conf config) (loadedConfig, e if len(primitives) > 0 { primitive := primitives[0] var p telegraf.PluginDescriber = plugin - if processor, ok := plugin.(unwrappable); ok { + if processor, ok := plugin.(processors.HasUnwrap); ok { p = processor.Unwrap() } if err := md.PrimitiveDecode(primitive, p); err != nil { @@ -168,7 +168,3 @@ func DefaultImportedPlugins() config { } return conf } - -type unwrappable interface { - Unwrap() telegraf.Processor -} diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index f09723415..cae09040c 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -249,7 +249,7 @@ func (adx *AzureDataExplorer) Init() error { } serializer := &json.Serializer{ - TimestampUnits: time.Nanosecond, + TimestampUnits: config.Duration(time.Nanosecond), TimestampFormat: time.RFC3339Nano, } if err := serializer.Init(); err != nil { diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index 6c39dc995..e8758d765 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -105,9 +105,7 @@ func TestWrite(t *testing.T) { for _, tC := range testCases { t.Run(tC.name, func(t *testing.T) { - serializer := &telegrafJson.Serializer{ - TimestampUnits: time.Second, - } + serializer := &telegrafJson.Serializer{} require.NoError(t, serializer.Init()) ingestionType := "queued" @@ -158,9 +156,7 @@ func TestWrite(t *testing.T) { } func TestCreateAzureDataExplorerTable(t *testing.T) { - serializer := &telegrafJson.Serializer{ - TimestampUnits: time.Second, - } + serializer := &telegrafJson.Serializer{} require.NoError(t, serializer.Init()) plugin := AzureDataExplorer{ Endpoint: "someendpoint", @@ -255,9 +251,7 @@ func TestWriteWithType(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - serializer := &telegrafJson.Serializer{ - TimestampUnits: time.Second, - } + serializer := &telegrafJson.Serializer{} require.NoError(t, serializer.Init()) for tableName, jsonValue := range testCase.tableNameToExpectedResult { ingestionType := "queued" diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index 80d48f6f5..6dcfb6bfe 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -42,9 +42,7 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt /* End wrapper interface */ func TestInitAndWrite(t *testing.T) { - serializer := &json.Serializer{ - TimestampUnits: time.Second, - } + serializer := &json.Serializer{} require.NoError(t, serializer.Init()) mockHub := &mockEventHub{} @@ -103,9 +101,7 @@ func TestInitAndWriteIntegration(t *testing.T) { testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name // Configure the plugin to target the newly created hub - serializer := &json.Serializer{ - TimestampUnits: time.Second, - } + serializer := &json.Serializer{} require.NoError(t, serializer.Init()) e := &EventHubs{ Hub: &eventHub{}, diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index aef008ce8..bfb928172 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -666,7 +666,7 @@ func TestBatchedUnbatched(t *testing.T) { influxSerializer := &influx.Serializer{} require.NoError(t, influxSerializer.Init()) - jsonSerializer := &json.Serializer{TimestampUnits: time.Second} + jsonSerializer := &json.Serializer{} require.NoError(t, jsonSerializer.Init()) s := map[string]serializers.Serializer{ diff --git a/plugins/outputs/stomp/stomp_test.go b/plugins/outputs/stomp/stomp_test.go index 7357edb6b..370269c43 100644 --- a/plugins/outputs/stomp/stomp_test.go +++ b/plugins/outputs/stomp/stomp_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/testutil" ) @@ -30,7 +31,7 @@ func TestConnectAndWrite(t *testing.T) { var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) s := &json.Serializer{ - TimestampUnits: 10 * time.Second, + TimestampUnits: config.Duration(10 * time.Second), TimestampFormat: "yyy-dd-mmThh:mm:ss", } require.NoError(t, s.Init()) diff --git a/plugins/processors/lookup/lookup_test.go b/plugins/processors/lookup/lookup_test.go index affc8ae88..929f63dae 100644 --- a/plugins/processors/lookup/lookup_test.go +++ b/plugins/processors/lookup/lookup_test.go @@ -84,8 +84,7 @@ func TestCases(t *testing.T) { require.NoError(t, cfg.LoadConfig(configFilename)) require.Len(t, cfg.Processors, 1, "wrong number of processors") - type unwrappable interface{ Unwrap() telegraf.Processor } - proc := cfg.Processors[0].Processor.(unwrappable) + proc := cfg.Processors[0].Processor.(processors.HasUnwrap) plugin := proc.Unwrap().(*Processor) require.NoError(t, plugin.Init()) @@ -154,8 +153,7 @@ func TestCasesTracking(t *testing.T) { require.NoError(t, cfg.LoadConfig(configFilename)) require.Len(t, cfg.Processors, 1, "wrong number of processors") - type unwrappableProcessor interface{ Unwrap() telegraf.Processor } - proc := cfg.Processors[0].Processor.(unwrappableProcessor) + proc := cfg.Processors[0].Processor.(processors.HasUnwrap) plugin := proc.Unwrap().(*Processor) require.NoError(t, plugin.Init()) diff --git a/plugins/processors/registry.go b/plugins/processors/registry.go index efade2966..687a29ea2 100644 --- a/plugins/processors/registry.go +++ b/plugins/processors/registry.go @@ -5,6 +5,12 @@ import "github.com/influxdata/telegraf" type Creator func() telegraf.Processor type StreamingCreator func() telegraf.StreamingProcessor +// HasUnwrap indicates the presence of an Unwrap() function to retrieve the +// underlying telegraf.Processor. +type HasUnwrap interface { + Unwrap() telegraf.Processor +} + // all processors are streaming processors. // telegraf.Processor processors are upgraded to telegraf.StreamingProcessor var Processors = map[string]StreamingCreator{} diff --git a/plugins/serializers/json/json.go b/plugins/serializers/json/json.go index 8a2201b24..8a90e5605 100644 --- a/plugins/serializers/json/json.go +++ b/plugins/serializers/json/json.go @@ -10,16 +10,17 @@ import ( "github.com/blues/jsonata-go" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/plugins/serializers" ) type Serializer struct { - TimestampUnits time.Duration `toml:"json_timestamp_units"` - TimestampFormat string `toml:"json_timestamp_format"` - Transformation string `toml:"json_transformation"` - NestedFieldsInclude []string `toml:"json_nested_fields_include"` - NestedFieldsExclude []string `toml:"json_nested_fields_exclude"` + TimestampUnits config.Duration `toml:"json_timestamp_units"` + TimestampFormat string `toml:"json_timestamp_format"` + Transformation string `toml:"json_transformation"` + NestedFieldsInclude []string `toml:"json_nested_fields_include"` + NestedFieldsExclude []string `toml:"json_nested_fields_exclude"` nestedfields filter.Filter } @@ -27,18 +28,20 @@ type Serializer struct { func (s *Serializer) Init() error { // Default precision is 1s if s.TimestampUnits <= 0 { - s.TimestampUnits = time.Second + s.TimestampUnits = config.Duration(time.Second) } // Search for the power of ten less than the duration d := time.Nanosecond + t := time.Duration(s.TimestampUnits) for { - if d*10 > s.TimestampUnits { - s.TimestampUnits = d + if d*10 > t { + t = d break } d = d * 10 } + s.TimestampUnits = config.Duration(t) if len(s.NestedFieldsInclude) > 0 || len(s.NestedFieldsExclude) > 0 { f, err := filter.NewIncludeExcludeFilter(s.NestedFieldsInclude, s.NestedFieldsExclude) @@ -165,7 +168,7 @@ func init() { // InitFromConfig is a compatibility function to construct the parser the old way func (s *Serializer) InitFromConfig(cfg *serializers.Config) error { - s.TimestampUnits = cfg.TimestampUnits + s.TimestampUnits = config.Duration(cfg.TimestampUnits) s.TimestampFormat = cfg.TimestampFormat s.Transformation = cfg.Transformation s.NestedFieldsInclude = cfg.JSONNestedFieldInclude diff --git a/plugins/serializers/json/json_test.go b/plugins/serializers/json/json_test.go index a78f7cbc0..3c7894903 100644 --- a/plugins/serializers/json/json_test.go +++ b/plugins/serializers/json/json_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" @@ -91,7 +92,7 @@ func TestSerialize_TimestampUnits(t *testing.T) { time.Unix(1525478795, 123456789), ) s := Serializer{ - TimestampUnits: tt.timestampUnits, + TimestampUnits: config.Duration(tt.timestampUnits), TimestampFormat: tt.timestampFormat, } require.NoError(t, s.Init()) @@ -270,7 +271,7 @@ func TestSerializeTransformationNonBatch(t *testing.T) { // Serialize serializer := Serializer{ - TimestampUnits: cfg.TimestampUnits, + TimestampUnits: config.Duration(cfg.TimestampUnits), TimestampFormat: cfg.TimestampFormat, Transformation: cfg.Transformation, } @@ -318,7 +319,7 @@ func TestSerializeTransformationBatch(t *testing.T) { // Serialize serializer := Serializer{ - TimestampUnits: cfg.TimestampUnits, + TimestampUnits: config.Duration(cfg.TimestampUnits), TimestampFormat: cfg.TimestampFormat, Transformation: cfg.Transformation, } @@ -432,7 +433,7 @@ func TestSerializeNesting(t *testing.T) { // Serialize serializer := Serializer{ - TimestampUnits: cfg.TimestampUnits, + TimestampUnits: config.Duration(cfg.TimestampUnits), TimestampFormat: cfg.TimestampFormat, Transformation: cfg.Transformation, NestedFieldsInclude: cfg.JSONNestedFieldsInclude,