feat: Nats Jetstream consumer add simple support for jetstream subjects (#11373)

This commit is contained in:
Bertram Holzer 2022-07-18 21:01:53 +02:00 committed by GitHub
parent 5e418d7073
commit 4766d0cbaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 5 deletions

View File

@ -15,8 +15,19 @@ instances of telegraf can read from a NATS cluster in parallel.
servers = ["nats://localhost:4222"] servers = ["nats://localhost:4222"]
## subject(s) to consume ## subject(s) to consume
## If you use jetstream you need to set the subjects
## in jetstream_subjects
subjects = ["telegraf"] subjects = ["telegraf"]
## jetstream subjects
## jetstream is a streaming technology inside of nats.
## With jetstream the nats-server persists messages and
## a consumer can consume historical messages. This is
## useful when telegraf needs to restart it don't miss a
## message. You need to configure the nats-server:
## https://docs.nats.io/nats-concepts/jetstream.
jetstream_subjects = ["js_telegraf"]
## name a queue group ## name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"
@ -62,3 +73,12 @@ instances of telegraf can read from a NATS cluster in parallel.
[nats]: https://www.nats.io/about/ [nats]: https://www.nats.io/about/
[input data formats]: /docs/DATA_FORMATS_INPUT.md [input data formats]: /docs/DATA_FORMATS_INPUT.md
[queue group]: https://www.nats.io/documentation/concepts/nats-queueing/ [queue group]: https://www.nats.io/documentation/concepts/nats-queueing/
## Metrics
Which data you will get depends on the subjects you consume from nats
## Example Output
Depends on the nats subject input
nats_consumer,host=[] value=1.9 1655972309339341000

View File

@ -46,6 +46,7 @@ type natsConsumer struct {
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
Credentials string `toml:"credentials"` Credentials string `toml:"credentials"`
JsSubjects []string `toml:"jetstream_subjects"`
tls.ClientConfig tls.ClientConfig
@ -59,7 +60,9 @@ type natsConsumer struct {
MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"` MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"`
conn *nats.Conn conn *nats.Conn
jsConn nats.JetStreamContext
subs []*nats.Subscription subs []*nats.Subscription
jsSubs []*nats.Subscription
parser parsers.Parser parser parsers.Parser
// channel for all incoming NATS messages // channel for all incoming NATS messages
@ -142,6 +145,33 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.subs = append(n.subs, sub) n.subs = append(n.subs, sub)
} }
if len(n.JsSubjects) > 0 {
var connErr error
n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256))
if connErr != nil {
return connErr
}
if n.jsConn != nil {
for _, jsSub := range n.JsSubjects {
sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}
// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
if err != nil {
return err
}
n.jsSubs = append(n.jsSubs, sub)
}
}
}
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -154,8 +184,8 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
go n.receiver(ctx) go n.receiver(ctx)
}() }()
n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v", n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, jssubjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup) n.conn.ConnectedUrl(), n.Subjects, n.JsSubjects, n.QueueGroup)
return nil return nil
} }
@ -201,7 +231,14 @@ func (n *natsConsumer) clean() {
for _, sub := range n.subs { for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil { if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err.Error()) sub.Subject, sub.Queue, err)
}
}
for _, sub := range n.jsSubs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
} }
} }

View File

@ -4,8 +4,19 @@
servers = ["nats://localhost:4222"] servers = ["nats://localhost:4222"]
## subject(s) to consume ## subject(s) to consume
## If you use jetstream you need to set the subjects
## in jetstream_subjects
subjects = ["telegraf"] subjects = ["telegraf"]
## jetstream subjects
## jetstream is a streaming technology inside of nats.
## With jetstream the nats-server persists messages and
## a consumer can consume historical messages. This is
## useful when telegraf needs to restart it don't miss a
## message. You need to configure the nats-server.
## https://docs.nats.io/nats-concepts/jetstream.
jetstream_subjects = ["js_telegraf"]
## name a queue group ## name a queue group
queue_group = "telegraf_consumers" queue_group = "telegraf_consumers"