diff --git a/plugins/outputs/nats/README.md b/plugins/outputs/nats/README.md index a9c3855ea..0fad4c1cf 100644 --- a/plugins/outputs/nats/README.md +++ b/plugins/outputs/nats/README.md @@ -39,6 +39,7 @@ to use them. # credentials = "/etc/telegraf/nats.creds" ## NATS subject for producer messages + ## For jetstream this is also the subject where messages will be published subject = "telegraf" ## Use Transport Layer Security @@ -56,4 +57,31 @@ to use them. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" + + ## Jetstream specific configuration. If not nil, it will assume Jetstream context. + ## Since this is a table, it should be present at the end of the plugin section. Else you can use inline table format. + # [outputs.nats.jetstream] + ## Name of the stream, required when using jetstream. Telegraf will + ## use the union of the above subject and below the subjects array. + # name = "" + # subjects = [] + + ## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams + # retention = "limits" + # max_consumers = -1 + # max_msgs_per_subject = -1 + # max_msgs = -1 + # max_bytes = -1 + # max_age = 0 + # max_msg_size = -1 + # storage = "file" + # discard = "old" + # num_replicas = 1 + # duplicate_window = 120000000000 + # sealed = false + # deny_delete = false + # deny_purge = false + # allow_rollup_hdrs = false + # allow_direct = true + # mirror_direct = false ``` diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index ce7f9f31b..b2ee4daf3 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -2,14 +2,19 @@ package nats import ( + "context" _ "embed" + "errors" "fmt" "strings" + "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" @@ -26,13 +31,52 @@ type NATS struct { Password config.Secret `toml:"password"` Credentials string `toml:"credentials"` Subject string `toml:"subject"` - + Jetstream *StreamConfig `toml:"jetstream"` tls.ClientConfig Log telegraf.Logger `toml:"-"` - conn *nats.Conn - serializer serializers.Serializer + conn *nats.Conn + jetstreamClient jetstream.JetStream + jetstreamStreamConfig *jetstream.StreamConfig + serializer serializers.Serializer +} + +// StreamConfig is the configuration for creating stream +// Almost a mirror of https://pkg.go.dev/github.com/nats-io/nats.go/jetstream#StreamConfig but with TOML tags +type StreamConfig struct { + Name string `toml:"name"` + Description string `toml:"description"` + Subjects []string `toml:"subjects"` + Retention string `toml:"retention"` + MaxConsumers int `toml:"max_consumers"` + MaxMsgs int64 `toml:"max_msgs"` + MaxBytes int64 `toml:"max_bytes"` + Discard string `toml:"discard"` + DiscardNewPerSubject bool `toml:"discard_new_per_subject"` + MaxAge config.Duration `toml:"max_age"` + MaxMsgsPerSubject int64 `toml:"max_msgs_per_subject"` + MaxMsgSize int32 `toml:"max_msg_size"` + Storage string `toml:"storage"` + Replicas int `toml:"num_replicas"` + NoAck bool `toml:"no_ack"` + Template string `toml:"template_owner"` + Duplicates config.Duration `toml:"duplicate_window"` + Placement *jetstream.Placement `toml:"placement"` + Mirror *jetstream.StreamSource `toml:"mirror"` + Sources []*jetstream.StreamSource `toml:"sources"` + Sealed bool `toml:"sealed"` + DenyDelete bool `toml:"deny_delete"` + DenyPurge bool `toml:"deny_purge"` + AllowRollup bool `toml:"allow_rollup_hdrs"` + Compression string `toml:"compression"` + FirstSeq uint64 `toml:"first_seq"` + SubjectTransform *jetstream.SubjectTransformConfig `toml:"subject_transform"` + RePublish *jetstream.RePublish `toml:"republish"` + AllowDirect bool `toml:"allow_direct"` + MirrorDirect bool `toml:"mirror_direct"` + ConsumerLimits jetstream.StreamConsumerLimits `toml:"consumer_limits"` + Metadata map[string]string `toml:"metadata"` } func (*NATS) SampleConfig() string { @@ -85,8 +129,125 @@ func (n *NATS) Connect() error { // try and connect n.conn, err = nats.Connect(strings.Join(n.Servers, ","), opts...) + if err != nil { + return err + } - return err + if n.Jetstream != nil { + n.jetstreamClient, err = jetstream.New(n.conn) + if err != nil { + return fmt.Errorf("failed to connect to jetstream: %w", err) + } + _, err = n.jetstreamClient.CreateOrUpdateStream(context.Background(), *n.jetstreamStreamConfig) + if err != nil { + return fmt.Errorf("failed to create or update stream: %w", err) + } + n.Log.Infof("Stream (%s) successfully created or updated", n.Jetstream.Name) + } + return nil +} + +func (n *NATS) getJetstreamConfig() (*jetstream.StreamConfig, error) { + var retention jetstream.RetentionPolicy + switch n.Jetstream.Retention { + case "", "limits": + retention = jetstream.LimitsPolicy + case "interest": + retention = jetstream.InterestPolicy + case "workqueue": + retention = jetstream.WorkQueuePolicy + default: + return nil, fmt.Errorf("invalid 'retention' setting %q", n.Jetstream.Retention) + } + + var discard jetstream.DiscardPolicy + switch n.Jetstream.Discard { + case "", "old": + discard = jetstream.DiscardOld + case "new": + discard = jetstream.DiscardNew + default: + return nil, fmt.Errorf("invalid 'discard' setting %q", n.Jetstream.Discard) + } + + var storage jetstream.StorageType + switch n.Jetstream.Storage { + case "memory": + storage = jetstream.MemoryStorage + case "", "file": + storage = jetstream.FileStorage + default: + return nil, fmt.Errorf("invalid 'storage' setting %q", n.Jetstream.Storage) + } + + var compression jetstream.StoreCompression + switch n.Jetstream.Compression { + case "s2": + compression = jetstream.S2Compression + case "", "none": + compression = jetstream.NoCompression + default: + return nil, fmt.Errorf("invalid 'compression' setting %q", n.Jetstream.Compression) + } + + streamConfig := &jetstream.StreamConfig{ + Name: n.Jetstream.Name, + Description: n.Jetstream.Description, + Subjects: n.Jetstream.Subjects, + Retention: retention, + MaxConsumers: n.Jetstream.MaxConsumers, + MaxMsgs: n.Jetstream.MaxMsgs, + MaxBytes: n.Jetstream.MaxBytes, + Discard: discard, + DiscardNewPerSubject: n.Jetstream.DiscardNewPerSubject, + MaxAge: time.Duration(n.Jetstream.MaxAge), + MaxMsgsPerSubject: n.Jetstream.MaxMsgsPerSubject, + MaxMsgSize: n.Jetstream.MaxMsgSize, + Storage: storage, + Replicas: n.Jetstream.Replicas, + NoAck: n.Jetstream.NoAck, + Template: n.Jetstream.Template, + Duplicates: time.Duration(n.Jetstream.Duplicates), + Placement: n.Jetstream.Placement, + Mirror: n.Jetstream.Mirror, + Sources: n.Jetstream.Sources, + Sealed: n.Jetstream.Sealed, + DenyDelete: n.Jetstream.DenyDelete, + DenyPurge: n.Jetstream.DenyPurge, + AllowRollup: n.Jetstream.AllowRollup, + Compression: compression, + FirstSeq: n.Jetstream.FirstSeq, + SubjectTransform: n.Jetstream.SubjectTransform, + RePublish: n.Jetstream.RePublish, + AllowDirect: n.Jetstream.AllowDirect, + MirrorDirect: n.Jetstream.MirrorDirect, + ConsumerLimits: n.Jetstream.ConsumerLimits, + Metadata: n.Jetstream.Metadata, + } + return streamConfig, nil +} + +func (n *NATS) Init() error { + if n.Jetstream != nil { + if strings.TrimSpace(n.Jetstream.Name) == "" { + return errors.New("stream cannot be empty") + } + + if len(n.Jetstream.Subjects) == 0 { + n.Jetstream.Subjects = []string{n.Subject} + } + // If the overall-subject is already present anywhere in the Jetstream subject we go from there, + // otherwise we should append the overall-subject as the last element. + if !choice.Contains(n.Subject, n.Jetstream.Subjects) { + n.Jetstream.Subjects = append(n.Jetstream.Subjects, n.Subject) + } + var err error + n.jetstreamStreamConfig, err = n.getJetstreamConfig() + if err != nil { + return fmt.Errorf("failed to parse jetstream config: %w", err) + } + } + return nil } func (n *NATS) Close() error { @@ -98,14 +259,13 @@ func (n *NATS) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } - for _, metric := range metrics { buf, err := n.serializer.Serialize(metric) if err != nil { n.Log.Debugf("Could not serialize metric: %v", err) continue } - + // use the same Publish API for nats core and jetstream err = n.conn.Publish(n.Subject, buf) if err != nil { return fmt.Errorf("FAILED to send NATS message: %w", err) diff --git a/plugins/outputs/nats/nats_test.go b/plugins/outputs/nats/nats_test.go index 85b8caaf7..7ac05ba85 100644 --- a/plugins/outputs/nats/nats_test.go +++ b/plugins/outputs/nats/nats_test.go @@ -1,12 +1,21 @@ package nats import ( + "context" + _ "embed" "fmt" + "path/filepath" "testing" + "time" + "github.com/docker/go-connections/nat" + "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -15,32 +24,162 @@ func TestConnectAndWriteIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - - servicePort := "4222" - container := testutil.Container{ - Image: "nats", - ExposedPorts: []string{servicePort}, - WaitingFor: wait.ForLog("Server is ready"), + natsServicePort := "4222" + type testConfig struct { + name string + container testutil.Container + nats *NATS + streamConfigCompareFunc func(*testing.T, *jetstream.StreamInfo) + wantErr bool } - err := container.Start() - require.NoError(t, err, "failed to start container") - defer container.Terminate() - - server := []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports[servicePort])} - serializer := &influx.Serializer{} - require.NoError(t, serializer.Init()) - n := &NATS{ - Servers: server, - Name: "telegraf", - Subject: "telegraf", - serializer: serializer, + testCases := []testConfig{ + { + name: "valid without jetstream", + container: testutil.Container{ + Image: "nats:latest", + ExposedPorts: []string{natsServicePort}, + WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)), + }, + nats: &NATS{ + Name: "telegraf", + Subject: "telegraf", + serializer: &influx.Serializer{}, + Log: testutil.Logger{}, + }, + }, + { + name: "valid with jetstream", + container: testutil.Container{ + Image: "nats:latest", + ExposedPorts: []string{natsServicePort}, + Cmd: []string{"--js"}, + WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)), + }, + nats: &NATS{ + Name: "telegraf", + Subject: "telegraf", + Jetstream: &StreamConfig{ + Name: "my-telegraf-stream", + }, + serializer: &influx.Serializer{}, + Log: testutil.Logger{}, + }, + streamConfigCompareFunc: func(t *testing.T, si *jetstream.StreamInfo) { + require.Equal(t, "my-telegraf-stream", si.Config.Name) + require.Equal(t, []string{"telegraf"}, si.Config.Subjects) + }, + }, + { + name: "create stream with config", + container: testutil.Container{ + Image: "nats:latest", + ExposedPorts: []string{natsServicePort}, + Cmd: []string{"--js"}, + WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)), + }, + nats: &NATS{ + Name: "telegraf", + Subject: "my-tel-sub-outer", + Jetstream: &StreamConfig{ + Name: "telegraf-stream-with-cfg", + Subjects: []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2"}, + Retention: "workqueue", + MaxConsumers: 10, + Discard: "new", + Storage: "memory", + MaxMsgs: 100_000, + MaxBytes: 104_857_600, + MaxAge: config.Duration(10 * time.Minute), + Duplicates: config.Duration(5 * time.Minute), + MaxMsgSize: 120, + MaxMsgsPerSubject: 500, + }, + serializer: &influx.Serializer{}, + Log: testutil.Logger{}, + }, + streamConfigCompareFunc: func(t *testing.T, si *jetstream.StreamInfo) { + require.Equal(t, "telegraf-stream-with-cfg", si.Config.Name) + require.Equal(t, []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2", "my-tel-sub-outer"}, si.Config.Subjects) + require.Equal(t, jetstream.WorkQueuePolicy, si.Config.Retention) + require.Equal(t, 10, si.Config.MaxConsumers) + require.Equal(t, jetstream.DiscardNew, si.Config.Discard) + require.Equal(t, jetstream.MemoryStorage, si.Config.Storage) + require.Equal(t, int64(100_000), si.Config.MaxMsgs) + require.Equal(t, int64(104_857_600), si.Config.MaxBytes) + require.Equal(t, 10*time.Minute, si.Config.MaxAge) + require.Equal(t, 5*time.Minute, si.Config.Duplicates) + require.Equal(t, int32(120), si.Config.MaxMsgSize) + require.Equal(t, int64(500), si.Config.MaxMsgsPerSubject) + }, + }, } - // Verify that we can connect to the NATS daemon - err = n.Connect() - require.NoError(t, err) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.container.Start() + require.NoError(t, err, "failed to start container") + defer tc.container.Terminate() - // Verify that we can successfully write data to the NATS daemon - err = n.Write(testutil.MockMetrics()) - require.NoError(t, err) + server := []string{fmt.Sprintf("nats://%s:%s", tc.container.Address, tc.container.Ports[natsServicePort])} + tc.nats.Servers = server + // Verify that we can connect to the NATS daemon + require.NoError(t, tc.nats.Init()) + err = tc.nats.Connect() + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + if tc.nats.Jetstream != nil { + stream, err := tc.nats.jetstreamClient.Stream(context.Background(), tc.nats.Jetstream.Name) + require.NoError(t, err) + si, err := stream.Info(context.Background()) + require.NoError(t, err) + + tc.streamConfigCompareFunc(t, si) + } + // Verify that we can successfully write data to the NATS daemon + err = tc.nats.Write(testutil.MockMetrics()) + require.NoError(t, err) + }) + } +} + +func TestConfigParsing(t *testing.T) { + // Define test cases + testCases := []struct { + name string + path string + wantErr bool + }{ + {name: "Valid Default", path: filepath.Join("testcases", "no-js.conf")}, + {name: "Valid JS", path: filepath.Join("testcases", "js-default.conf")}, + {name: "Valid JS Config", path: filepath.Join("testcases", "js-config.conf")}, + {name: "Subjects warning", path: filepath.Join("testcases", "js-subjects.conf")}, + {name: "Invalid JS", path: filepath.Join("testcases", "js-no-stream.conf"), wantErr: true}, + } + + // Register the plugin + outputs.Add("nats", func() telegraf.Output { + return &NATS{} + }) + srl := &influx.Serializer{} + require.NoError(t, srl.Init()) + + // Run tests using the table-driven approach + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(tc.path)) + require.Len(t, cfg.Outputs, 1) + err := cfg.Outputs[0].Init() + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } } diff --git a/plugins/outputs/nats/sample.conf b/plugins/outputs/nats/sample.conf index 4ae9a9319..23b27d394 100644 --- a/plugins/outputs/nats/sample.conf +++ b/plugins/outputs/nats/sample.conf @@ -14,6 +14,7 @@ # credentials = "/etc/telegraf/nats.creds" ## NATS subject for producer messages + ## For jetstream this is also the subject where messages will be published subject = "telegraf" ## Use Transport Layer Security @@ -31,3 +32,30 @@ ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" + + ## Jetstream specific configuration. If not nil, it will assume Jetstream context. + ## Since this is a table, it should be present at the end of the plugin section. Else you can use inline table format. + # [outputs.nats.jetstream] + ## Name of the stream, required when using jetstream. Telegraf will + ## use the union of the above subject and below the subjects array. + # name = "" + # subjects = [] + + ## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams + # retention = "limits" + # max_consumers = -1 + # max_msgs_per_subject = -1 + # max_msgs = -1 + # max_bytes = -1 + # max_age = 0 + # max_msg_size = -1 + # storage = "file" + # discard = "old" + # num_replicas = 1 + # duplicate_window = 120000000000 + # sealed = false + # deny_delete = false + # deny_purge = false + # allow_rollup_hdrs = false + # allow_direct = true + # mirror_direct = false diff --git a/plugins/outputs/nats/testcases/js-config.conf b/plugins/outputs/nats/testcases/js-config.conf new file mode 100644 index 000000000..56edf9fab --- /dev/null +++ b/plugins/outputs/nats/testcases/js-config.conf @@ -0,0 +1,16 @@ +## NATS output with jetstream stream config provided +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + subject = "telegraf-subject" + data_format = "influx" + [outputs.nats.jetstream] + name = "my-telegraf-stream" + retention = "workqueue" + max_consumers = 10 + discard = "old" + storage = "memory" + max_msgs = 100000 + max_bytes = 104857600 # 100 MB + max_age = 86400000000000 # in the int64 format + num_replicas = 1 \ No newline at end of file diff --git a/plugins/outputs/nats/testcases/js-default.conf b/plugins/outputs/nats/testcases/js-default.conf new file mode 100644 index 000000000..4f4fea4f9 --- /dev/null +++ b/plugins/outputs/nats/testcases/js-default.conf @@ -0,0 +1,8 @@ +## NATS output with jetstream, but no config provided(use defaults) +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + subject = "telegraf-subject" + data_format = "influx" + [outputs.nats.jetstream] + name = "my-telegraf-stream" \ No newline at end of file diff --git a/plugins/outputs/nats/testcases/js-no-stream.conf b/plugins/outputs/nats/testcases/js-no-stream.conf new file mode 100644 index 000000000..7ff526bec --- /dev/null +++ b/plugins/outputs/nats/testcases/js-no-stream.conf @@ -0,0 +1,7 @@ +## NATS output with jetstream, but empty stream +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + subject = "telegraf-subject" + data_format = "influx" + [outputs.nats.jetstream] \ No newline at end of file diff --git a/plugins/outputs/nats/testcases/js-subjects.conf b/plugins/outputs/nats/testcases/js-subjects.conf new file mode 100644 index 000000000..acbf08762 --- /dev/null +++ b/plugins/outputs/nats/testcases/js-subjects.conf @@ -0,0 +1,9 @@ +## NATS output with jetstream, but subject mismatch +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + subject = "telegraf-subject" + data_format = "influx" + [outputs.nats.jetstream] + name = "my-stream" + subjects = ["not", "here"] \ No newline at end of file diff --git a/plugins/outputs/nats/testcases/no-js.conf b/plugins/outputs/nats/testcases/no-js.conf new file mode 100644 index 000000000..98ad7ad77 --- /dev/null +++ b/plugins/outputs/nats/testcases/no-js.conf @@ -0,0 +1,6 @@ +## NATS output with jetstream, but no config provided(use defaults) +[[outputs.nats]] + ## URLs of NATS servers + servers = ["nats://localhost:4222"] + subject = "telegraf-subject" + data_format = "influx" \ No newline at end of file