diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go index 4c1589b48..d38f17ffd 100644 --- a/plugins/inputs/execd/shim/goshim.go +++ b/plugins/inputs/execd/shim/goshim.go @@ -106,7 +106,7 @@ func (s *Shim) Run(pollInterval time.Duration) error { } gatherPromptCh := make(chan empty, 1) s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh) - wg.Add(1) + wg.Add(1) // one per input go func(input telegraf.Input) { startGathering(ctx, input, acc, gatherPromptCh, pollInterval) if serviceInput, ok := input.(telegraf.ServiceInput); ok { @@ -216,11 +216,7 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu select { case <-ctx.Done(): return - case _, open := <-gatherPromptCh: - if !open { - // stdin has closed. - return - } + case <-gatherPromptCh: if err := input.Gather(acc); err != nil { fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) } diff --git a/plugins/inputs/execd/shim/shim_posix_test.go b/plugins/inputs/execd/shim/shim_posix_test.go index de549cc3c..00e5dc6c3 100644 --- a/plugins/inputs/execd/shim/shim_posix_test.go +++ b/plugins/inputs/execd/shim/shim_posix_test.go @@ -51,13 +51,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) { } }() - timeout := time.NewTimer(10 * time.Second) - - select { - case <-metricProcessed: - case <-timeout.C: - require.Fail(t, "Timeout waiting for metric to arrive") - } + <-metricProcessed cancel() r := bufio.NewReader(stdoutReader) @@ -66,5 +60,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) { require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out) stdinWriter.Close() + readUntilEmpty(r) + <-exited } diff --git a/plugins/inputs/execd/shim/shim_test.go b/plugins/inputs/execd/shim/shim_test.go index 5fd79895f..2a31e5adc 100644 --- a/plugins/inputs/execd/shim/shim_test.go +++ b/plugins/inputs/execd/shim/shim_test.go @@ -21,22 +21,12 @@ func TestShimWorks(t *testing.T) { stdin, _ = io.Pipe() // hold the stdin pipe open - timeout := time.NewTimer(10 * time.Second) metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond) - select { - case <-metricProcessed: - case <-timeout.C: - require.Fail(t, "Timeout waiting for metric to arrive") - } + <-metricProcessed for stdoutBytes.Len() == 0 { - select { - case <-timeout.C: - require.Fail(t, "Timeout waiting to read metric from stdout") - return - default: - time.Sleep(10 * time.Millisecond) - } + t.Log("Waiting for bytes available in stdout") + time.Sleep(10 * time.Millisecond) } out := string(stdoutBytes.Bytes()) @@ -52,16 +42,11 @@ func TestShimStdinSignalingWorks(t *testing.T) { stdin = stdinReader stdout = stdoutWriter - timeout := time.NewTimer(10 * time.Second) metricProcessed, exited := runInputPlugin(t, 40*time.Second) stdinWriter.Write([]byte("\n")) - select { - case <-metricProcessed: - case <-timeout.C: - require.Fail(t, "Timeout waiting for metric to arrive") - } + <-metricProcessed r := bufio.NewReader(stdoutReader) out, err := r.ReadString('\n') @@ -69,12 +54,15 @@ func TestShimStdinSignalingWorks(t *testing.T) { require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out) stdinWriter.Close() + + readUntilEmpty(r) + // check that it exits cleanly <-exited } func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) { - metricProcessed = make(chan bool) + metricProcessed = make(chan bool, 10) exited = make(chan bool) inp := &testInput{ metricProcessed: metricProcessed, @@ -172,3 +160,15 @@ func (i *serviceInput) Start(acc telegraf.Accumulator) error { func (i *serviceInput) Stop() { } + +// we can get stuck if stdout gets clogged up and nobody's reading from it. +// make sure we keep it going +func readUntilEmpty(r *bufio.Reader) { + go func() { + var err error + for err != io.EOF { + _, err = r.ReadString('\n') + time.Sleep(10 * time.Millisecond) + } + }() +}