Flaky shim test (#7656)

This commit is contained in:
Steven Soroka 2020-06-09 16:24:02 -04:00 committed by GitHub
parent e00424d7b2
commit dda46ea32b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 33 deletions

View File

@ -106,7 +106,7 @@ func (s *Shim) Run(pollInterval time.Duration) error {
} }
gatherPromptCh := make(chan empty, 1) gatherPromptCh := make(chan empty, 1)
s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh) s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh)
wg.Add(1) wg.Add(1) // one per input
go func(input telegraf.Input) { go func(input telegraf.Input) {
startGathering(ctx, input, acc, gatherPromptCh, pollInterval) startGathering(ctx, input, acc, gatherPromptCh, pollInterval)
if serviceInput, ok := input.(telegraf.ServiceInput); ok { if serviceInput, ok := input.(telegraf.ServiceInput); ok {
@ -216,11 +216,7 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case _, open := <-gatherPromptCh: case <-gatherPromptCh:
if !open {
// stdin has closed.
return
}
if err := input.Gather(acc); err != nil { if err := input.Gather(acc); err != nil {
fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err) fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err)
} }

View File

@ -51,13 +51,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
} }
}() }()
timeout := time.NewTimer(10 * time.Second) <-metricProcessed
select {
case <-metricProcessed:
case <-timeout.C:
require.Fail(t, "Timeout waiting for metric to arrive")
}
cancel() cancel()
r := bufio.NewReader(stdoutReader) r := bufio.NewReader(stdoutReader)
@ -66,5 +60,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out) require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out)
stdinWriter.Close() stdinWriter.Close()
readUntilEmpty(r)
<-exited <-exited
} }

View File

@ -21,22 +21,12 @@ func TestShimWorks(t *testing.T) {
stdin, _ = io.Pipe() // hold the stdin pipe open stdin, _ = io.Pipe() // hold the stdin pipe open
timeout := time.NewTimer(10 * time.Second)
metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond) metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond)
select { <-metricProcessed
case <-metricProcessed:
case <-timeout.C:
require.Fail(t, "Timeout waiting for metric to arrive")
}
for stdoutBytes.Len() == 0 { for stdoutBytes.Len() == 0 {
select { t.Log("Waiting for bytes available in stdout")
case <-timeout.C: time.Sleep(10 * time.Millisecond)
require.Fail(t, "Timeout waiting to read metric from stdout")
return
default:
time.Sleep(10 * time.Millisecond)
}
} }
out := string(stdoutBytes.Bytes()) out := string(stdoutBytes.Bytes())
@ -52,16 +42,11 @@ func TestShimStdinSignalingWorks(t *testing.T) {
stdin = stdinReader stdin = stdinReader
stdout = stdoutWriter stdout = stdoutWriter
timeout := time.NewTimer(10 * time.Second)
metricProcessed, exited := runInputPlugin(t, 40*time.Second) metricProcessed, exited := runInputPlugin(t, 40*time.Second)
stdinWriter.Write([]byte("\n")) stdinWriter.Write([]byte("\n"))
select { <-metricProcessed
case <-metricProcessed:
case <-timeout.C:
require.Fail(t, "Timeout waiting for metric to arrive")
}
r := bufio.NewReader(stdoutReader) r := bufio.NewReader(stdoutReader)
out, err := r.ReadString('\n') out, err := r.ReadString('\n')
@ -69,12 +54,15 @@ func TestShimStdinSignalingWorks(t *testing.T) {
require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out) require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out)
stdinWriter.Close() stdinWriter.Close()
readUntilEmpty(r)
// check that it exits cleanly // check that it exits cleanly
<-exited <-exited
} }
func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) { func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) {
metricProcessed = make(chan bool) metricProcessed = make(chan bool, 10)
exited = make(chan bool) exited = make(chan bool)
inp := &testInput{ inp := &testInput{
metricProcessed: metricProcessed, metricProcessed: metricProcessed,
@ -172,3 +160,15 @@ func (i *serviceInput) Start(acc telegraf.Accumulator) error {
func (i *serviceInput) Stop() { func (i *serviceInput) Stop() {
} }
// we can get stuck if stdout gets clogged up and nobody's reading from it.
// make sure we keep it going
func readUntilEmpty(r *bufio.Reader) {
go func() {
var err error
for err != io.EOF {
_, err = r.ReadString('\n')
time.Sleep(10 * time.Millisecond)
}
}()
}