From 9b250a18693932ec71ea7d0edab0a2893140b2a2 Mon Sep 17 00:00:00 2001 From: kfollesdal Date: Mon, 14 Apr 2025 18:39:25 +0200 Subject: [PATCH] feat(outputs.nats): Allow asynchronous publishing for Jetstream (#16582) Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- plugins/outputs/nats/README.md | 7 ++++ plugins/outputs/nats/nats.go | 39 +++++++++++++++++-- plugins/outputs/nats/nats_test.go | 1 + plugins/outputs/nats/sample.conf | 7 ++++ .../outputs/nats/testcases/js-async-pub.conf | 11 ++++++ 5 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 plugins/outputs/nats/testcases/js-async-pub.conf diff --git a/plugins/outputs/nats/README.md b/plugins/outputs/nats/README.md index 0d2a62e7b..3990ba6ff 100644 --- a/plugins/outputs/nats/README.md +++ b/plugins/outputs/nats/README.md @@ -74,6 +74,13 @@ to use them. # name = "" # subjects = [] + ## Use asynchronous publishing for higher throughput, but note that it does not guarantee order within batches. + # async_publish = false + + ## Timeout for wating on acknowledgement on asynchronous publishing + ## String with valid units "ns", "us" (or "µs"), "ms", "s", "m", "h". + # async_ack_timeout = "5s" + ## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams # retention = "limits" # max_consumers = -1 diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index 1316766b8..0e1ba1a02 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -76,6 +76,8 @@ type StreamConfig struct { MirrorDirect bool `toml:"mirror_direct"` ConsumerLimits jetstream.StreamConsumerLimits `toml:"consumer_limits"` Metadata map[string]string `toml:"metadata"` + AsyncPublish bool `toml:"async_publish"` + AsyncAckTimeout config.Duration `toml:"async_ack_timeout"` } func (*NATS) SampleConfig() string { @@ -258,14 +260,24 @@ func (n *NATS) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } - for _, metric := range metrics { + + var pafs []jetstream.PubAckFuture + if n.Jetstream != nil && n.Jetstream.AsyncPublish { + pafs = make([]jetstream.PubAckFuture, len(metrics)) + } + + for i, metric := range metrics { buf, err := n.serializer.Serialize(metric) if err != nil { n.Log.Debugf("Could not serialize metric: %v", err) continue } if n.Jetstream != nil { - _, err = n.jetstreamClient.Publish(context.Background(), n.Subject, buf, jetstream.WithExpectStream(n.Jetstream.Name)) + if n.Jetstream.AsyncPublish { + pafs[i], err = n.jetstreamClient.PublishAsync(n.Subject, buf, jetstream.WithExpectStream(n.Jetstream.Name)) + } else { + _, err = n.jetstreamClient.Publish(context.Background(), n.Subject, buf, jetstream.WithExpectStream(n.Jetstream.Name)) + } } else { err = n.conn.Publish(n.Subject, buf) } @@ -273,11 +285,32 @@ func (n *NATS) Write(metrics []telegraf.Metric) error { return fmt.Errorf("failed to send NATS message: %w", err) } } + + if pafs != nil { + // Check Ack from async publish + select { + case <-n.jetstreamClient.PublishAsyncComplete(): + for i := range pafs { + select { + case <-pafs[i].Ok(): + continue + case err := <-pafs[i].Err(): + return fmt.Errorf("publish acknowledgement is an error: %w (retrying)", err) + } + } + case <-time.After(time.Duration(n.Jetstream.AsyncAckTimeout)): + return fmt.Errorf("waiting for acknowledgement timed out, %d messages pending", n.jetstreamClient.PublishAsyncPending()) + } + } return nil } func init() { outputs.Add("nats", func() telegraf.Output { - return &NATS{} + return &NATS{ + Jetstream: &StreamConfig{ + AsyncAckTimeout: config.Duration(time.Second * 5), + }, + } }) } diff --git a/plugins/outputs/nats/nats_test.go b/plugins/outputs/nats/nats_test.go index 888dd2e82..388cf5d98 100644 --- a/plugins/outputs/nats/nats_test.go +++ b/plugins/outputs/nats/nats_test.go @@ -155,6 +155,7 @@ func TestConfigParsing(t *testing.T) { {name: "Valid Default", path: filepath.Join("testcases", "no-js.conf")}, {name: "Valid JS", path: filepath.Join("testcases", "js-default.conf")}, {name: "Valid JS Config", path: filepath.Join("testcases", "js-config.conf")}, + {name: "Valid JS Async Publish", path: filepath.Join("testcases", "js-async-pub.conf")}, {name: "Subjects warning", path: filepath.Join("testcases", "js-subjects.conf")}, {name: "Invalid JS", path: filepath.Join("testcases", "js-no-stream.conf"), wantErr: true}, } diff --git a/plugins/outputs/nats/sample.conf b/plugins/outputs/nats/sample.conf index acabda25b..3c472e935 100644 --- a/plugins/outputs/nats/sample.conf +++ b/plugins/outputs/nats/sample.conf @@ -41,6 +41,13 @@ # name = "" # subjects = [] + ## Use asynchronous publishing for higher throughput, but note that it does not guarantee order within batches. + # async_publish = false + + ## Timeout for wating on acknowledgement on asynchronous publishing + ## String with valid units "ns", "us" (or "µs"), "ms", "s", "m", "h". + # async_ack_timeout = "5s" + ## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams # retention = "limits" # max_consumers = -1 diff --git a/plugins/outputs/nats/testcases/js-async-pub.conf b/plugins/outputs/nats/testcases/js-async-pub.conf new file mode 100644 index 000000000..6ee7b7c69 --- /dev/null +++ b/plugins/outputs/nats/testcases/js-async-pub.conf @@ -0,0 +1,11 @@ +## NATS output with async jetstream publish +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + subject = "telegraf-subject" + data_format = "influx" + [outputs.nats.jetstream] + name = "my-stream" + subjects = ["not", "here"] + async_publish = true + async_ack_timeout = "5s"