feat(outputs.nats): Introduce NATS Jetstream option (#14236)

Co-authored-by: Joshua Powers <powersj@fastmail.com>
Co-authored-by: Thomas Casteleyn <thomas.casteleyn@me.com>
This commit is contained in:
Neelay Upadhyaya 2024-01-23 16:52:24 +05:30 committed by GitHub
parent 57021be82c
commit 06c13666e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 431 additions and 30 deletions

View File

@ -39,6 +39,7 @@ to use them.
# credentials = "/etc/telegraf/nats.creds"
## NATS subject for producer messages
## For jetstream this is also the subject where messages will be published
subject = "telegraf"
## Use Transport Layer Security
@ -56,4 +57,31 @@ to use them.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
## Jetstream specific configuration. If not nil, it will assume Jetstream context.
## Since this is a table, it should be present at the end of the plugin section. Else you can use inline table format.
# [outputs.nats.jetstream]
## Name of the stream, required when using jetstream. Telegraf will
## use the union of the above subject and below the subjects array.
# name = ""
# subjects = []
## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
# retention = "limits"
# max_consumers = -1
# max_msgs_per_subject = -1
# max_msgs = -1
# max_bytes = -1
# max_age = 0
# max_msg_size = -1
# storage = "file"
# discard = "old"
# num_replicas = 1
# duplicate_window = 120000000000
# sealed = false
# deny_delete = false
# deny_purge = false
# allow_rollup_hdrs = false
# allow_direct = true
# mirror_direct = false
```

View File

@ -2,14 +2,19 @@
package nats
import (
"context"
_ "embed"
"errors"
"fmt"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
@ -26,13 +31,52 @@ type NATS struct {
Password config.Secret `toml:"password"`
Credentials string `toml:"credentials"`
Subject string `toml:"subject"`
Jetstream *StreamConfig `toml:"jetstream"`
tls.ClientConfig
Log telegraf.Logger `toml:"-"`
conn *nats.Conn
serializer serializers.Serializer
conn *nats.Conn
jetstreamClient jetstream.JetStream
jetstreamStreamConfig *jetstream.StreamConfig
serializer serializers.Serializer
}
// StreamConfig is the configuration for creating stream
// Almost a mirror of https://pkg.go.dev/github.com/nats-io/nats.go/jetstream#StreamConfig but with TOML tags
type StreamConfig struct {
Name string `toml:"name"`
Description string `toml:"description"`
Subjects []string `toml:"subjects"`
Retention string `toml:"retention"`
MaxConsumers int `toml:"max_consumers"`
MaxMsgs int64 `toml:"max_msgs"`
MaxBytes int64 `toml:"max_bytes"`
Discard string `toml:"discard"`
DiscardNewPerSubject bool `toml:"discard_new_per_subject"`
MaxAge config.Duration `toml:"max_age"`
MaxMsgsPerSubject int64 `toml:"max_msgs_per_subject"`
MaxMsgSize int32 `toml:"max_msg_size"`
Storage string `toml:"storage"`
Replicas int `toml:"num_replicas"`
NoAck bool `toml:"no_ack"`
Template string `toml:"template_owner"`
Duplicates config.Duration `toml:"duplicate_window"`
Placement *jetstream.Placement `toml:"placement"`
Mirror *jetstream.StreamSource `toml:"mirror"`
Sources []*jetstream.StreamSource `toml:"sources"`
Sealed bool `toml:"sealed"`
DenyDelete bool `toml:"deny_delete"`
DenyPurge bool `toml:"deny_purge"`
AllowRollup bool `toml:"allow_rollup_hdrs"`
Compression string `toml:"compression"`
FirstSeq uint64 `toml:"first_seq"`
SubjectTransform *jetstream.SubjectTransformConfig `toml:"subject_transform"`
RePublish *jetstream.RePublish `toml:"republish"`
AllowDirect bool `toml:"allow_direct"`
MirrorDirect bool `toml:"mirror_direct"`
ConsumerLimits jetstream.StreamConsumerLimits `toml:"consumer_limits"`
Metadata map[string]string `toml:"metadata"`
}
func (*NATS) SampleConfig() string {
@ -85,8 +129,125 @@ func (n *NATS) Connect() error {
// try and connect
n.conn, err = nats.Connect(strings.Join(n.Servers, ","), opts...)
if err != nil {
return err
}
return err
if n.Jetstream != nil {
n.jetstreamClient, err = jetstream.New(n.conn)
if err != nil {
return fmt.Errorf("failed to connect to jetstream: %w", err)
}
_, err = n.jetstreamClient.CreateOrUpdateStream(context.Background(), *n.jetstreamStreamConfig)
if err != nil {
return fmt.Errorf("failed to create or update stream: %w", err)
}
n.Log.Infof("Stream (%s) successfully created or updated", n.Jetstream.Name)
}
return nil
}
func (n *NATS) getJetstreamConfig() (*jetstream.StreamConfig, error) {
var retention jetstream.RetentionPolicy
switch n.Jetstream.Retention {
case "", "limits":
retention = jetstream.LimitsPolicy
case "interest":
retention = jetstream.InterestPolicy
case "workqueue":
retention = jetstream.WorkQueuePolicy
default:
return nil, fmt.Errorf("invalid 'retention' setting %q", n.Jetstream.Retention)
}
var discard jetstream.DiscardPolicy
switch n.Jetstream.Discard {
case "", "old":
discard = jetstream.DiscardOld
case "new":
discard = jetstream.DiscardNew
default:
return nil, fmt.Errorf("invalid 'discard' setting %q", n.Jetstream.Discard)
}
var storage jetstream.StorageType
switch n.Jetstream.Storage {
case "memory":
storage = jetstream.MemoryStorage
case "", "file":
storage = jetstream.FileStorage
default:
return nil, fmt.Errorf("invalid 'storage' setting %q", n.Jetstream.Storage)
}
var compression jetstream.StoreCompression
switch n.Jetstream.Compression {
case "s2":
compression = jetstream.S2Compression
case "", "none":
compression = jetstream.NoCompression
default:
return nil, fmt.Errorf("invalid 'compression' setting %q", n.Jetstream.Compression)
}
streamConfig := &jetstream.StreamConfig{
Name: n.Jetstream.Name,
Description: n.Jetstream.Description,
Subjects: n.Jetstream.Subjects,
Retention: retention,
MaxConsumers: n.Jetstream.MaxConsumers,
MaxMsgs: n.Jetstream.MaxMsgs,
MaxBytes: n.Jetstream.MaxBytes,
Discard: discard,
DiscardNewPerSubject: n.Jetstream.DiscardNewPerSubject,
MaxAge: time.Duration(n.Jetstream.MaxAge),
MaxMsgsPerSubject: n.Jetstream.MaxMsgsPerSubject,
MaxMsgSize: n.Jetstream.MaxMsgSize,
Storage: storage,
Replicas: n.Jetstream.Replicas,
NoAck: n.Jetstream.NoAck,
Template: n.Jetstream.Template,
Duplicates: time.Duration(n.Jetstream.Duplicates),
Placement: n.Jetstream.Placement,
Mirror: n.Jetstream.Mirror,
Sources: n.Jetstream.Sources,
Sealed: n.Jetstream.Sealed,
DenyDelete: n.Jetstream.DenyDelete,
DenyPurge: n.Jetstream.DenyPurge,
AllowRollup: n.Jetstream.AllowRollup,
Compression: compression,
FirstSeq: n.Jetstream.FirstSeq,
SubjectTransform: n.Jetstream.SubjectTransform,
RePublish: n.Jetstream.RePublish,
AllowDirect: n.Jetstream.AllowDirect,
MirrorDirect: n.Jetstream.MirrorDirect,
ConsumerLimits: n.Jetstream.ConsumerLimits,
Metadata: n.Jetstream.Metadata,
}
return streamConfig, nil
}
func (n *NATS) Init() error {
if n.Jetstream != nil {
if strings.TrimSpace(n.Jetstream.Name) == "" {
return errors.New("stream cannot be empty")
}
if len(n.Jetstream.Subjects) == 0 {
n.Jetstream.Subjects = []string{n.Subject}
}
// If the overall-subject is already present anywhere in the Jetstream subject we go from there,
// otherwise we should append the overall-subject as the last element.
if !choice.Contains(n.Subject, n.Jetstream.Subjects) {
n.Jetstream.Subjects = append(n.Jetstream.Subjects, n.Subject)
}
var err error
n.jetstreamStreamConfig, err = n.getJetstreamConfig()
if err != nil {
return fmt.Errorf("failed to parse jetstream config: %w", err)
}
}
return nil
}
func (n *NATS) Close() error {
@ -98,14 +259,13 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
for _, metric := range metrics {
buf, err := n.serializer.Serialize(metric)
if err != nil {
n.Log.Debugf("Could not serialize metric: %v", err)
continue
}
// use the same Publish API for nats core and jetstream
err = n.conn.Publish(n.Subject, buf)
if err != nil {
return fmt.Errorf("FAILED to send NATS message: %w", err)

View File

@ -1,12 +1,21 @@
package nats
import (
"context"
_ "embed"
"fmt"
"path/filepath"
"testing"
"time"
"github.com/docker/go-connections/nat"
"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -15,32 +24,162 @@ func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
servicePort := "4222"
container := testutil.Container{
Image: "nats",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForLog("Server is ready"),
natsServicePort := "4222"
type testConfig struct {
name string
container testutil.Container
nats *NATS
streamConfigCompareFunc func(*testing.T, *jetstream.StreamInfo)
wantErr bool
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()
server := []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports[servicePort])}
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
n := &NATS{
Servers: server,
Name: "telegraf",
Subject: "telegraf",
serializer: serializer,
testCases := []testConfig{
{
name: "valid without jetstream",
container: testutil.Container{
Image: "nats:latest",
ExposedPorts: []string{natsServicePort},
WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)),
},
nats: &NATS{
Name: "telegraf",
Subject: "telegraf",
serializer: &influx.Serializer{},
Log: testutil.Logger{},
},
},
{
name: "valid with jetstream",
container: testutil.Container{
Image: "nats:latest",
ExposedPorts: []string{natsServicePort},
Cmd: []string{"--js"},
WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)),
},
nats: &NATS{
Name: "telegraf",
Subject: "telegraf",
Jetstream: &StreamConfig{
Name: "my-telegraf-stream",
},
serializer: &influx.Serializer{},
Log: testutil.Logger{},
},
streamConfigCompareFunc: func(t *testing.T, si *jetstream.StreamInfo) {
require.Equal(t, "my-telegraf-stream", si.Config.Name)
require.Equal(t, []string{"telegraf"}, si.Config.Subjects)
},
},
{
name: "create stream with config",
container: testutil.Container{
Image: "nats:latest",
ExposedPorts: []string{natsServicePort},
Cmd: []string{"--js"},
WaitingFor: wait.ForListeningPort(nat.Port(natsServicePort)),
},
nats: &NATS{
Name: "telegraf",
Subject: "my-tel-sub-outer",
Jetstream: &StreamConfig{
Name: "telegraf-stream-with-cfg",
Subjects: []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2"},
Retention: "workqueue",
MaxConsumers: 10,
Discard: "new",
Storage: "memory",
MaxMsgs: 100_000,
MaxBytes: 104_857_600,
MaxAge: config.Duration(10 * time.Minute),
Duplicates: config.Duration(5 * time.Minute),
MaxMsgSize: 120,
MaxMsgsPerSubject: 500,
},
serializer: &influx.Serializer{},
Log: testutil.Logger{},
},
streamConfigCompareFunc: func(t *testing.T, si *jetstream.StreamInfo) {
require.Equal(t, "telegraf-stream-with-cfg", si.Config.Name)
require.Equal(t, []string{"my-tel-sub0", "my-tel-sub1", "my-tel-sub2", "my-tel-sub-outer"}, si.Config.Subjects)
require.Equal(t, jetstream.WorkQueuePolicy, si.Config.Retention)
require.Equal(t, 10, si.Config.MaxConsumers)
require.Equal(t, jetstream.DiscardNew, si.Config.Discard)
require.Equal(t, jetstream.MemoryStorage, si.Config.Storage)
require.Equal(t, int64(100_000), si.Config.MaxMsgs)
require.Equal(t, int64(104_857_600), si.Config.MaxBytes)
require.Equal(t, 10*time.Minute, si.Config.MaxAge)
require.Equal(t, 5*time.Minute, si.Config.Duplicates)
require.Equal(t, int32(120), si.Config.MaxMsgSize)
require.Equal(t, int64(500), si.Config.MaxMsgsPerSubject)
},
},
}
// Verify that we can connect to the NATS daemon
err = n.Connect()
require.NoError(t, err)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.container.Start()
require.NoError(t, err, "failed to start container")
defer tc.container.Terminate()
// Verify that we can successfully write data to the NATS daemon
err = n.Write(testutil.MockMetrics())
require.NoError(t, err)
server := []string{fmt.Sprintf("nats://%s:%s", tc.container.Address, tc.container.Ports[natsServicePort])}
tc.nats.Servers = server
// Verify that we can connect to the NATS daemon
require.NoError(t, tc.nats.Init())
err = tc.nats.Connect()
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
if tc.nats.Jetstream != nil {
stream, err := tc.nats.jetstreamClient.Stream(context.Background(), tc.nats.Jetstream.Name)
require.NoError(t, err)
si, err := stream.Info(context.Background())
require.NoError(t, err)
tc.streamConfigCompareFunc(t, si)
}
// Verify that we can successfully write data to the NATS daemon
err = tc.nats.Write(testutil.MockMetrics())
require.NoError(t, err)
})
}
}
func TestConfigParsing(t *testing.T) {
// Define test cases
testCases := []struct {
name string
path string
wantErr bool
}{
{name: "Valid Default", path: filepath.Join("testcases", "no-js.conf")},
{name: "Valid JS", path: filepath.Join("testcases", "js-default.conf")},
{name: "Valid JS Config", path: filepath.Join("testcases", "js-config.conf")},
{name: "Subjects warning", path: filepath.Join("testcases", "js-subjects.conf")},
{name: "Invalid JS", path: filepath.Join("testcases", "js-no-stream.conf"), wantErr: true},
}
// Register the plugin
outputs.Add("nats", func() telegraf.Output {
return &NATS{}
})
srl := &influx.Serializer{}
require.NoError(t, srl.Init())
// Run tests using the table-driven approach
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(tc.path))
require.Len(t, cfg.Outputs, 1)
err := cfg.Outputs[0].Init()
if tc.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

View File

@ -14,6 +14,7 @@
# credentials = "/etc/telegraf/nats.creds"
## NATS subject for producer messages
## For jetstream this is also the subject where messages will be published
subject = "telegraf"
## Use Transport Layer Security
@ -31,3 +32,30 @@
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
## Jetstream specific configuration. If not nil, it will assume Jetstream context.
## Since this is a table, it should be present at the end of the plugin section. Else you can use inline table format.
# [outputs.nats.jetstream]
## Name of the stream, required when using jetstream. Telegraf will
## use the union of the above subject and below the subjects array.
# name = ""
# subjects = []
## Full jetstream create stream config, refer: https://docs.nats.io/nats-concepts/jetstream/streams
# retention = "limits"
# max_consumers = -1
# max_msgs_per_subject = -1
# max_msgs = -1
# max_bytes = -1
# max_age = 0
# max_msg_size = -1
# storage = "file"
# discard = "old"
# num_replicas = 1
# duplicate_window = 120000000000
# sealed = false
# deny_delete = false
# deny_purge = false
# allow_rollup_hdrs = false
# allow_direct = true
# mirror_direct = false

View File

@ -0,0 +1,16 @@
## NATS output with jetstream stream config provided
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
[outputs.nats.jetstream]
name = "my-telegraf-stream"
retention = "workqueue"
max_consumers = 10
discard = "old"
storage = "memory"
max_msgs = 100000
max_bytes = 104857600 # 100 MB
max_age = 86400000000000 # in the int64 format
num_replicas = 1

View File

@ -0,0 +1,8 @@
## NATS output with jetstream, but no config provided(use defaults)
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
[outputs.nats.jetstream]
name = "my-telegraf-stream"

View File

@ -0,0 +1,7 @@
## NATS output with jetstream, but empty stream
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
[outputs.nats.jetstream]

View File

@ -0,0 +1,9 @@
## NATS output with jetstream, but subject mismatch
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"
[outputs.nats.jetstream]
name = "my-stream"
subjects = ["not", "here"]

View File

@ -0,0 +1,6 @@
## NATS output with jetstream, but no config provided(use defaults)
[[outputs.nats]]
## URLs of NATS servers
servers = ["nats://localhost:4222"]
subject = "telegraf-subject"
data_format = "influx"