feat(inputs.nats_consumer): Add receiver subject as tag (#13293)

This commit is contained in:
Sven Rebhan 2023-05-19 20:39:59 +02:00 committed by GitHub
parent 1b74a25252
commit ed72510fe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 221 additions and 5 deletions

View File

@ -103,4 +103,4 @@ Which data you will get depends on the subjects you consume from nats
## Example Output
Depends on the nats subject input
nats_consumer,host=[] value=1.9 1655972309339341000
nats_consumer,host=foo,subject=recvsubj value=1.9 1655972309339341000

View File

@ -93,8 +93,6 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.acc = acc.WithTracking(n.MaxUndeliveredMessages)
var connectErr error
options := []nats.Option{
nats.MaxReconnects(-1),
nats.ErrorHandler(n.natsErrHandler),
@ -119,6 +117,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
}
if n.conn == nil || n.conn.IsClosed() {
var connectErr error
n.conn, connectErr = nats.Connect(strings.Join(n.Servers, ","), options...)
if connectErr != nil {
return connectErr
@ -219,7 +218,9 @@ func (n *natsConsumer) receiver(ctx context.Context) {
<-sem
continue
}
for _, m := range metrics {
m.AddTag("subject", msg.Subject)
}
n.acc.AddTrackingMetricGroup(metrics)
}
}
@ -260,7 +261,6 @@ func init() {
inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{
Servers: []string{"nats://localhost:4222"},
Secure: false,
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,

View File

@ -0,0 +1,216 @@
package nats_consumer
import (
"fmt"
"testing"
"time"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)
func TestStartStop(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "nats",
ExposedPorts: []string{"4222"},
WaitingFor: wait.ForLog("Server is ready"),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
plugin := &natsConsumer{
Servers: []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"])},
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Log: testutil.Logger{},
}
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
plugin.Stop()
}
func TestSendReceive(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "nats",
ExposedPorts: []string{"4222"},
WaitingFor: wait.ForLog("Server is ready"),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
addr := fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"])
tests := []struct {
name string
msgs map[string][]string
expected []telegraf.Metric
}{
{
name: "single message",
msgs: map[string][]string{
"telegraf": {"test,source=foo value=42i"},
},
expected: []telegraf.Metric{
metric.New(
"test",
map[string]string{
"source": "foo",
"subject": "telegraf",
},
map[string]interface{}{"value": int64(42)},
time.Unix(0, 0),
),
},
},
{
name: "multiple message",
msgs: map[string][]string{
"telegraf": {
"test,source=foo value=42i",
"test,source=bar value=23i",
},
"hitchhiker": {
"wale,part=front named=true",
"wale,part=back named=false",
},
},
expected: []telegraf.Metric{
metric.New(
"test",
map[string]string{
"source": "foo",
"subject": "telegraf",
},
map[string]interface{}{"value": int64(42)},
time.Unix(0, 0),
),
metric.New(
"test",
map[string]string{
"source": "bar",
"subject": "telegraf",
},
map[string]interface{}{"value": int64(23)},
time.Unix(0, 0),
),
metric.New(
"wale",
map[string]string{
"part": "front",
"subject": "hitchhiker",
},
map[string]interface{}{"named": true},
time.Unix(0, 0),
),
metric.New(
"wale",
map[string]string{
"part": "back",
"subject": "hitchhiker",
},
map[string]interface{}{"named": false},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
subjects := make([]string, 0, len(tt.msgs))
for k := range tt.msgs {
subjects = append(subjects, k)
}
// Setup the plugin
plugin := &natsConsumer{
Servers: []string{addr},
Subjects: subjects,
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Log: testutil.Logger{},
}
// Add a line-protocol parser
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Startup the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Send all messages to the topics (random order due to Golang map)
publisher := &sender{Addr: addr}
require.NoError(t, publisher.Connect())
defer publisher.Disconnect()
for topic, msgs := range tt.msgs {
for _, msg := range msgs {
require.NoError(t, publisher.Send(topic, msg))
}
}
publisher.Disconnect()
// Wait for the metrics to be collected
require.Eventually(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(tt.expected))
}, time.Second, 100*time.Millisecond)
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime(), testutil.SortMetrics())
})
}
}
type sender struct {
Addr string
Username string
Password string
conn *nats.Conn
}
func (s *sender) Connect() error {
conn, err := nats.Connect(s.Addr)
if err != nil {
return err
}
s.conn = conn
return nil
}
func (s *sender) Disconnect() {
if s.conn != nil && !s.conn.IsClosed() {
_ = s.conn.Flush()
s.conn.Close()
}
s.conn = nil
}
func (s *sender) Send(topic, msg string) error {
return s.conn.Publish(topic, []byte(msg))
}