From ac685d19f850e59cb8a241e82635374fb7a365e0 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 20 Oct 2015 16:45:31 -0600 Subject: [PATCH] Clean up logging messages and add flusher startup delay Fixes #294 --- agent.go | 28 ++++++++++++++-------------- cmd/telegraf/telegraf.go | 4 ++-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/agent.go b/agent.go index 475dcf3d2..aa146ca3d 100644 --- a/agent.go +++ b/agent.go @@ -1,7 +1,6 @@ package telegraf import ( - "errors" "fmt" "log" "os" @@ -212,7 +211,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { wg.Wait() elapsed := time.Since(start) - log.Printf("Default (%s) interval, gathered metrics from %d plugins in %s\n", + log.Printf("Gathered metrics, (%s interval), from %d plugins in %s\n", a.Interval, counter, elapsed) return nil } @@ -240,7 +239,7 @@ func (a *Agent) gatherSeparate( } elapsed := time.Since(start) - log.Printf("Separate (%s) interval, gathered metrics from %s in %s\n", + log.Printf("Gathered metrics, (separate %s interval), from %s in %s\n", plugin.config.Interval, plugin.name, elapsed) if outerr != nil { @@ -294,29 +293,35 @@ func (a *Agent) Test() error { return nil } -func (a *Agent) flush(points []*client.Point) error { +func (a *Agent) flush(points []*client.Point) { var wg sync.WaitGroup - var outerr error + start := time.Now() + counter := 0 for _, o := range a.outputs { wg.Add(1) + counter++ go func(ro *runningOutput) { defer wg.Done() // Log all output errors: if err := ro.output.Write(points); err != nil { - log.Printf("Error in output [%s]: %s", ro.name, err) - outerr = errors.New("Error encountered flushing outputs") + log.Printf("Error in output [%s]: %s", ro.name, err.Error()) } }(o) } wg.Wait() - return outerr + elapsed := time.Since(start) + log.Printf("Flushed %d metrics to %d output sinks in %s\n", + len(points), counter, elapsed) } // flusher monitors the points input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { + // Inelegant, but this sleep is to allow the Gather threads to run, so that + // the flusher will flush after metrics are collected. + time.Sleep(time.Millisecond * 100) ticker := time.NewTicker(a.FlushInterval.Duration) points := make([]*client.Point, 0) for { @@ -324,12 +329,7 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er case <-shutdown: return nil case <-ticker.C: - start := time.Now() - if err := a.flush(points); err != nil { - log.Printf(err.Error()) - } - elapsed := time.Since(start) - log.Printf("Flushed %d metrics in %s\n", len(points), elapsed) + a.flush(points) points = make([]*client.Point, 0) case pt := <-pointChan: points = append(points, pt) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index bf53d45b2..065cc82dc 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -133,8 +133,8 @@ func main() { log.Printf("Loaded outputs: %s", strings.Join(outputs, " ")) log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+ - "Precision:%#v, UTC: %#v\n", - ag.Interval, ag.Debug, ag.Hostname, ag.Precision, ag.UTC) + "Flush Interval:%s\n", + ag.Interval, ag.Debug, ag.Hostname, ag.FlushInterval) log.Printf("Tags enabled: %s", config.ListTags()) if *fPidfile != "" {