feat(outputs.nats): Allow asynchronous publishing for Jetstream (#16582)

Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com>
This commit is contained in:
kfollesdal 2025-04-14 18:39:25 +02:00 committed by GitHub
parent 799e194700
commit 9b250a1869
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 3 deletions

View File

@ -74,6 +74,13 @@ to use them.
# name = ""
# subjects = []
## Use asynchronous publishing for higher throughput, but note that it does not guarantee order within batches.
# async_publish = false
## Timeout for wating on acknowledgement on asynchronous publishing
## String with valid units "ns", "us" (or "µs"), "ms", "s", "m", "h".
# async_ack_timeout = "5s"
## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
# retention = "limits"
# max_consumers = -1

View File

@ -76,6 +76,8 @@ type StreamConfig struct {
MirrorDirect bool `toml:"mirror_direct"`
ConsumerLimits jetstream.StreamConsumerLimits `toml:"consumer_limits"`
Metadata map[string]string `toml:"metadata"`
AsyncPublish bool `toml:"async_publish"`
AsyncAckTimeout config.Duration `toml:"async_ack_timeout"`
}
func (*NATS) SampleConfig() string {
@ -258,14 +260,24 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
for _, metric := range metrics {
var pafs []jetstream.PubAckFuture
if n.Jetstream != nil && n.Jetstream.AsyncPublish {
pafs = make([]jetstream.PubAckFuture, len(metrics))
}
for i, metric := range metrics {
buf, err := n.serializer.Serialize(metric)
if err != nil {
n.Log.Debugf("Could not serialize metric: %v", err)
continue
}
if n.Jetstream != nil {
_, err = n.jetstreamClient.Publish(context.Background(), n.Subject, buf, jetstream.WithExpectStream(n.Jetstream.Name))
if n.Jetstream.AsyncPublish {
pafs[i], err = n.jetstreamClient.PublishAsync(n.Subject, buf, jetstream.WithExpectStream(n.Jetstream.Name))
} else {
_, err = n.jetstreamClient.Publish(context.Background(), n.Subject, buf, jetstream.WithExpectStream(n.Jetstream.Name))
}
} else {
err = n.conn.Publish(n.Subject, buf)
}
@ -273,11 +285,32 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("failed to send NATS message: %w", err)
}
}
if pafs != nil {
// Check Ack from async publish
select {
case <-n.jetstreamClient.PublishAsyncComplete():
for i := range pafs {
select {
case <-pafs[i].Ok():
continue
case err := <-pafs[i].Err():
return fmt.Errorf("publish acknowledgement is an error: %w (retrying)", err)
}
}
case <-time.After(time.Duration(n.Jetstream.AsyncAckTimeout)):
return fmt.Errorf("waiting for acknowledgement timed out, %d messages pending", n.jetstreamClient.PublishAsyncPending())
}
}
return nil
}
func init() {
outputs.Add("nats", func() telegraf.Output {
return &NATS{}
return &NATS{
Jetstream: &StreamConfig{
AsyncAckTimeout: config.Duration(time.Second * 5),
},
}
})
}

View File

@ -155,6 +155,7 @@ func TestConfigParsing(t *testing.T) {
{name: "Valid Default", path: filepath.Join("testcases", "no-js.conf")},
{name: "Valid JS", path: filepath.Join("testcases", "js-default.conf")},
{name: "Valid JS Config", path: filepath.Join("testcases", "js-config.conf")},
{name: "Valid JS Async Publish", path: filepath.Join("testcases", "js-async-pub.conf")},
{name: "Subjects warning", path: filepath.Join("testcases", "js-subjects.conf")},
{name: "Invalid JS", path: filepath.Join("testcases", "js-no-stream.conf"), wantErr: true},
}

View File

@ -41,6 +41,13 @@
# name = ""
# subjects = []
## Use asynchronous publishing for higher throughput, but note that it does not guarantee order within batches.
# async_publish = false
## Timeout for wating on acknowledgement on asynchronous publishing
## String with valid units "ns", "us" (or "µs"), "ms", "s", "m", "h".
# async_ack_timeout = "5s"
## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
# retention = "limits"
# max_consumers = -1

View File

@ -0,0 +1,11 @@
## NATS output with async jetstream publish
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
[outputs.nats.jetstream]
name = "my-stream"
subjects = ["not", "here"]
async_publish = true
async_ack_timeout = "5s"