diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index d0a79293f..739d7c89f 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -103,4 +103,4 @@ Which data you will get depends on the subjects you consume from nats ## Example Output Depends on the nats subject input -nats_consumer,host=[] value=1.9 1655972309339341000 +nats_consumer,host=foo,subject=recvsubj value=1.9 1655972309339341000 diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 7e3ae3aa7..a9d27ee89 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -93,8 +93,6 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro func (n *natsConsumer) Start(acc telegraf.Accumulator) error { n.acc = acc.WithTracking(n.MaxUndeliveredMessages) - var connectErr error - options := []nats.Option{ nats.MaxReconnects(-1), nats.ErrorHandler(n.natsErrHandler), @@ -119,6 +117,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { } if n.conn == nil || n.conn.IsClosed() { + var connectErr error n.conn, connectErr = nats.Connect(strings.Join(n.Servers, ","), options...) if connectErr != nil { return connectErr @@ -219,7 +218,9 @@ func (n *natsConsumer) receiver(ctx context.Context) { <-sem continue } - + for _, m := range metrics { + m.AddTag("subject", msg.Subject) + } n.acc.AddTrackingMetricGroup(metrics) } } @@ -260,7 +261,6 @@ func init() { inputs.Add("nats_consumer", func() telegraf.Input { return &natsConsumer{ Servers: []string{"nats://localhost:4222"}, - Secure: false, Subjects: []string{"telegraf"}, QueueGroup: "telegraf_consumers", PendingBytesLimit: nats.DefaultSubPendingBytesLimit, diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go new file mode 100644 index 000000000..e600f4821 --- /dev/null +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -0,0 +1,216 @@ +package nats_consumer + +import ( + "fmt" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" +) + +func TestStartStop(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := testutil.Container{ + Image: "nats", + ExposedPorts: []string{"4222"}, + WaitingFor: wait.ForLog("Server is ready"), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + plugin := &natsConsumer{ + Servers: []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"])}, + Subjects: []string{"telegraf"}, + QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, + PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + Log: testutil.Logger{}, + } + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + plugin.Stop() +} + +func TestSendReceive(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := testutil.Container{ + Image: "nats", + ExposedPorts: []string{"4222"}, + WaitingFor: wait.ForLog("Server is ready"), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + addr := fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"]) + + tests := []struct { + name string + msgs map[string][]string + expected []telegraf.Metric + }{ + { + name: "single message", + msgs: map[string][]string{ + "telegraf": {"test,source=foo value=42i"}, + }, + expected: []telegraf.Metric{ + metric.New( + "test", + map[string]string{ + "source": "foo", + "subject": "telegraf", + }, + map[string]interface{}{"value": int64(42)}, + time.Unix(0, 0), + ), + }, + }, + { + name: "multiple message", + msgs: map[string][]string{ + "telegraf": { + "test,source=foo value=42i", + "test,source=bar value=23i", + }, + "hitchhiker": { + "wale,part=front named=true", + "wale,part=back named=false", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "test", + map[string]string{ + "source": "foo", + "subject": "telegraf", + }, + map[string]interface{}{"value": int64(42)}, + time.Unix(0, 0), + ), + metric.New( + "test", + map[string]string{ + "source": "bar", + "subject": "telegraf", + }, + map[string]interface{}{"value": int64(23)}, + time.Unix(0, 0), + ), + metric.New( + "wale", + map[string]string{ + "part": "front", + "subject": "hitchhiker", + }, + map[string]interface{}{"named": true}, + time.Unix(0, 0), + ), + metric.New( + "wale", + map[string]string{ + "part": "back", + "subject": "hitchhiker", + }, + map[string]interface{}{"named": false}, + time.Unix(0, 0), + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + subjects := make([]string, 0, len(tt.msgs)) + for k := range tt.msgs { + subjects = append(subjects, k) + } + + // Setup the plugin + plugin := &natsConsumer{ + Servers: []string{addr}, + Subjects: subjects, + QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, + PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + Log: testutil.Logger{}, + } + + // Add a line-protocol parser + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Startup the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Send all messages to the topics (random order due to Golang map) + publisher := &sender{Addr: addr} + require.NoError(t, publisher.Connect()) + defer publisher.Disconnect() + for topic, msgs := range tt.msgs { + for _, msg := range msgs { + require.NoError(t, publisher.Send(topic, msg)) + } + } + publisher.Disconnect() + + // Wait for the metrics to be collected + require.Eventually(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(tt.expected)) + }, time.Second, 100*time.Millisecond) + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime(), testutil.SortMetrics()) + }) + } +} + +type sender struct { + Addr string + + Username string + Password string + + conn *nats.Conn +} + +func (s *sender) Connect() error { + conn, err := nats.Connect(s.Addr) + if err != nil { + return err + } + s.conn = conn + + return nil +} + +func (s *sender) Disconnect() { + if s.conn != nil && !s.conn.IsClosed() { + _ = s.conn.Flush() + s.conn.Close() + } + s.conn = nil +} + +func (s *sender) Send(topic, msg string) error { + return s.conn.Publish(topic, []byte(msg)) +}