feat(config): Allow reloading on URL config change (#15388)
This commit is contained in:
parent
9eeb4a845b
commit
0e636b729a
|
|
@ -225,6 +225,7 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
|
||||||
configDir: cCtx.StringSlice("config-directory"),
|
configDir: cCtx.StringSlice("config-directory"),
|
||||||
testWait: cCtx.Int("test-wait"),
|
testWait: cCtx.Int("test-wait"),
|
||||||
configURLRetryAttempts: cCtx.Int("config-url-retry-attempts"),
|
configURLRetryAttempts: cCtx.Int("config-url-retry-attempts"),
|
||||||
|
configURLWatchInterval: cCtx.Duration("config-url-watch-interval"),
|
||||||
watchConfig: cCtx.String("watch-config"),
|
watchConfig: cCtx.String("watch-config"),
|
||||||
pidFile: cCtx.String("pidfile"),
|
pidFile: cCtx.String("pidfile"),
|
||||||
plugindDir: cCtx.String("plugin-directory"),
|
plugindDir: cCtx.String("plugin-directory"),
|
||||||
|
|
@ -279,7 +280,8 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
|
||||||
&cli.IntFlag{
|
&cli.IntFlag{
|
||||||
Name: "config-url-retry-attempts",
|
Name: "config-url-retry-attempts",
|
||||||
Usage: "Number of attempts to obtain a remote configuration via a URL during startup. " +
|
Usage: "Number of attempts to obtain a remote configuration via a URL during startup. " +
|
||||||
"Set to -1 for unlimited attempts. (default: 3)",
|
"Set to -1 for unlimited attempts.",
|
||||||
|
DefaultText: "3",
|
||||||
},
|
},
|
||||||
//
|
//
|
||||||
// String flags
|
// String flags
|
||||||
|
|
@ -330,6 +332,13 @@ func runApp(args []string, outputBuffer io.Writer, pprof Server, c TelegrafConfi
|
||||||
Usage: "enable test mode: gather metrics, print them out, and exit. " +
|
Usage: "enable test mode: gather metrics, print them out, and exit. " +
|
||||||
"Note: Test mode only runs inputs, not processors, aggregators, or outputs",
|
"Note: Test mode only runs inputs, not processors, aggregators, or outputs",
|
||||||
},
|
},
|
||||||
|
//
|
||||||
|
// Duration flags
|
||||||
|
&cli.DurationFlag{
|
||||||
|
Name: "config-url-watch-interval",
|
||||||
|
Usage: "Time duration to check for updates to URL based configuration files",
|
||||||
|
DefaultText: "disabled",
|
||||||
|
},
|
||||||
// TODO: Change "deprecation-list, input-list, output-list" flags to become a subcommand "list" that takes
|
// TODO: Change "deprecation-list, input-list, output-list" flags to become a subcommand "list" that takes
|
||||||
// "input,output,aggregator,processor, deprecated" as parameters
|
// "input,output,aggregator,processor, deprecated" as parameters
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -36,6 +38,7 @@ type GlobalFlags struct {
|
||||||
configDir []string
|
configDir []string
|
||||||
testWait int
|
testWait int
|
||||||
configURLRetryAttempts int
|
configURLRetryAttempts int
|
||||||
|
configURLWatchInterval time.Duration
|
||||||
watchConfig string
|
watchConfig string
|
||||||
pidFile string
|
pidFile string
|
||||||
plugindDir string
|
plugindDir string
|
||||||
|
|
@ -147,11 +150,26 @@ func (t *Telegraf) reloadLoop() error {
|
||||||
syscall.SIGTERM, syscall.SIGINT)
|
syscall.SIGTERM, syscall.SIGINT)
|
||||||
if t.watchConfig != "" {
|
if t.watchConfig != "" {
|
||||||
for _, fConfig := range t.configFiles {
|
for _, fConfig := range t.configFiles {
|
||||||
if _, err := os.Stat(fConfig); err == nil {
|
if isURL(fConfig) {
|
||||||
go t.watchLocalConfig(signals, fConfig)
|
continue
|
||||||
} else {
|
|
||||||
log.Printf("W! Cannot watch config %s: %s", fConfig, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(fConfig); err != nil {
|
||||||
|
log.Printf("W! Cannot watch config %s: %s", fConfig, err)
|
||||||
|
} else {
|
||||||
|
go t.watchLocalConfig(signals, fConfig)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if t.configURLWatchInterval > 0 {
|
||||||
|
remoteConfigs := make([]string, 0)
|
||||||
|
for _, fConfig := range t.configFiles {
|
||||||
|
if isURL(fConfig) {
|
||||||
|
remoteConfigs = append(remoteConfigs, fConfig)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(remoteConfigs) > 0 {
|
||||||
|
go t.watchRemoteConfigs(signals, t.configURLWatchInterval, remoteConfigs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
|
|
@ -194,7 +212,7 @@ func (t *Telegraf) watchLocalConfig(signals chan os.Signal, fConfig string) {
|
||||||
log.Printf("E! Error watching config: %s\n", err)
|
log.Printf("E! Error watching config: %s\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Println("I! Config watcher started")
|
log.Printf("I! Config watcher started for %s\n", fConfig)
|
||||||
select {
|
select {
|
||||||
case <-changes.Modified:
|
case <-changes.Modified:
|
||||||
log.Println("I! Config file modified")
|
log.Println("I! Config file modified")
|
||||||
|
|
@ -221,6 +239,45 @@ func (t *Telegraf) watchLocalConfig(signals chan os.Signal, fConfig string) {
|
||||||
signals <- syscall.SIGHUP
|
signals <- syscall.SIGHUP
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Telegraf) watchRemoteConfigs(signals chan os.Signal, interval time.Duration, remoteConfigs []string) {
|
||||||
|
configs := strings.Join(remoteConfigs, ", ")
|
||||||
|
log.Printf("I! Remote config watcher started for: %s\n", configs)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
lastModified := make(map[string]string, len(remoteConfigs))
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-signals:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
for _, configURL := range remoteConfigs {
|
||||||
|
resp, err := http.Head(configURL) //nolint: gosec // user provided URL
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("W! Error fetching config URL, %s: %s\n", configURL, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
modified := resp.Header.Get("Last-Modified")
|
||||||
|
if modified == "" {
|
||||||
|
log.Printf("E! Last-Modified header not found, stopping the watcher for %s\n", configURL)
|
||||||
|
delete(lastModified, configURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastModified[configURL] == "" {
|
||||||
|
lastModified[configURL] = modified
|
||||||
|
} else if lastModified[configURL] != modified {
|
||||||
|
log.Printf("I! Remote config modified: %s\n", configURL)
|
||||||
|
signals <- syscall.SIGHUP
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Telegraf) loadConfiguration() (*config.Config, error) {
|
func (t *Telegraf) loadConfiguration() (*config.Config, error) {
|
||||||
// If no other options are specified, load the config file and run.
|
// If no other options are specified, load the config file and run.
|
||||||
c := config.NewConfig()
|
c := config.NewConfig()
|
||||||
|
|
@ -386,3 +443,9 @@ func (t *Telegraf) runAgent(ctx context.Context, c *config.Config, reloadConfig
|
||||||
|
|
||||||
return ag.Run(ctx)
|
return ag.Run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isURL checks if string is valid url
|
||||||
|
func isURL(str string) bool {
|
||||||
|
u, err := url.Parse(str)
|
||||||
|
return err == nil && u.Scheme != "" && u.Host != ""
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -277,7 +277,7 @@ type AgentConfig struct {
|
||||||
|
|
||||||
// Number of attempts to obtain a remote configuration via a URL during
|
// Number of attempts to obtain a remote configuration via a URL during
|
||||||
// startup. Set to -1 for unlimited attempts.
|
// startup. Set to -1 for unlimited attempts.
|
||||||
ConfigURLRetryAttempts int `toml:"config-url-retry-attempts"`
|
ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// InputNames returns a list of strings of the configured inputs.
|
// InputNames returns a list of strings of the configured inputs.
|
||||||
|
|
@ -783,7 +783,6 @@ func fetchConfig(u *url.URL, urlRetryAttempts int) ([]byte, error) {
|
||||||
log.Printf("Using unlimited number of attempts to fetch HTTP config")
|
log.Printf("Using unlimited number of attempts to fetch HTTP config")
|
||||||
} else if urlRetryAttempts == 0 {
|
} else if urlRetryAttempts == 0 {
|
||||||
totalAttempts = 3
|
totalAttempts = 3
|
||||||
log.Printf("Using default number of attempts to fetch HTTP config: %d", totalAttempts)
|
|
||||||
} else if urlRetryAttempts > 0 {
|
} else if urlRetryAttempts > 0 {
|
||||||
totalAttempts = urlRetryAttempts
|
totalAttempts = urlRetryAttempts
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue