add stream support (#24)

* Implement the stream support and filtering for the stream
---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2025-02-13 12:04:35 +01:00 committed by GitHub
parent 707fe72c3d
commit 031a2ac54d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 601 additions and 31 deletions

View File

@ -70,9 +70,7 @@ func main() {
// Create a consumer to receive messages from the queue
// you need to build the address of the queue, but you can use the helper function
consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
Queue: queueName,
}, "getting-started-consumer")
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return
@ -115,7 +113,7 @@ func main() {
return
}
for i := 0; i < 1_000; i++ {
for i := 0; i < 100; i++ {
// Publish a message to the exchange
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {

View File

@ -77,9 +77,7 @@ func main() {
return
}
consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
Queue: queueName,
}, "reliable-consumer")
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return

View File

@ -87,14 +87,17 @@ func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAdd
}
// NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress.
func (a *AmqpConnection) NewConsumer(ctx context.Context, destination *QueueAddress, linkName string) (*Consumer, error) {
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error) {
destination := &QueueAddress{
Queue: queueName,
}
destinationAdd, err := destination.toAddress()
if err != nil {
return nil, err
}
err = validateAddress(destinationAdd)
return newConsumer(ctx, a, destinationAdd, linkName)
return newConsumer(ctx, a, destinationAdd, options)
}
// Dial connect to the AMQP 1.0 server using the provided connectionSettings

View File

@ -43,9 +43,8 @@ var _ = Describe("Recovery connection test", func() {
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{
Queue: qName,
}, "test")
consumer, err := connection.NewConsumer(context.Background(),
qName, nil)
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{
Queue: qName,

View File

@ -67,21 +67,33 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio
type Consumer struct {
receiver atomic.Pointer[amqp.Receiver]
connection *AmqpConnection
linkName string
options ConsumerOptions
destinationAdd string
id string
/*
currentOffset is the current offset of the consumer. It is valid only for the stream consumers.
it is used to keep track of the last message that was consumed by the consumer.
so in case of restart the consumer can start from the last message that was consumed.
For the AMQP queues it is just ignored.
*/
currentOffset int64
}
func (c *Consumer) Id() string {
return c.id
}
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, linkName string, args ...string) (*Consumer, error) {
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options ConsumerOptions, args ...string) (*Consumer, error) {
id := fmt.Sprintf("consumer-%s", uuid.New().String())
if len(args) > 0 {
id = args[0]
}
r := &Consumer{connection: connection, linkName: linkName, destinationAdd: destinationAdd, id: id}
r := &Consumer{connection: connection, options: options,
destinationAdd: destinationAdd,
currentOffset: -1,
id: id}
connection.entitiesTracker.storeOrReplaceConsumer(r)
err := r.createReceiver(ctx)
if err != nil {
@ -91,7 +103,24 @@ func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd
}
func (c *Consumer) createReceiver(ctx context.Context) error {
receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, createReceiverLinkOptions(c.destinationAdd, c.linkName, AtLeastOnce))
if c.currentOffset >= 0 {
// here it means that the consumer is a stream consumer and there is a restart.
// so we need to set the offset to the last consumed message in order to restart from there.
// In there is not a restart this code won't be executed.
if c.options != nil {
// here we assume it is a stream. So we recreate the options with the offset.
c.options = &StreamConsumerOptions{
ReceiverLinkName: c.options.linkName(),
InitialCredits: c.options.initialCredits(),
// we increment the offset by one to start from the next message.
// because the current was already consumed.
Offset: &OffsetValue{Offset: uint64(c.currentOffset + 1)},
}
}
}
receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd,
createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce))
if err != nil {
return err
}
@ -105,6 +134,12 @@ func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
if err != nil {
return nil, err
}
if msg != nil && msg.Annotations != nil && msg.Annotations["x-stream-offset"] != nil {
// keep track of the current offset of the consumer
c.currentOffset = msg.Annotations["x-stream-offset"].(int64)
}
return &DeliveryContext{receiver: c.receiver.Load(), message: msg}, nil
}

View File

@ -0,0 +1,318 @@
package rabbitmq_amqp
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
testhelper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper"
"strconv"
"time"
)
func publishMessagesWithStreamTag(queueName string, filterValue string, count int) {
conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil)
Expect(err).To(BeNil())
publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queueName}, "producer_filter_stream")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
for i := 0; i < count; i++ {
body := filterValue + " #" + strconv.Itoa(i)
msg := amqp.NewMessage([]byte(body))
msg.Annotations = amqp.Annotations{
"x-stream-filter-value": filterValue,
}
publishResult, err := publisher.Publish(context.TODO(), msg)
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
}
err = conn.Close(context.TODO())
Expect(err).To(BeNil())
}
var _ = Describe("Consumer stream test", func() {
It("start consuming with different offset types", func() {
/*
Test the different offset types for stream consumers
1. OffsetValue
2. OffsetFirst
3. OffsetLast
4. OffsetNext
With 10 messages in the queue, the test will create a consumer with different offset types
the test 1, 2, 4 can be deterministic. The test 3 can't be deterministic (in this test),
but we can check if there is at least one message, and it is not the first one.
It is enough to verify the functionality of the offset types.
*/
qName := generateName("start consuming with different offset types")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.name).To(Equal(qName))
publishMessages(qName, 10)
consumerOffsetValue, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "offset_value_test",
InitialCredits: 1,
Offset: &OffsetValue{Offset: 5},
})
Expect(err).To(BeNil())
Expect(consumerOffsetValue).NotTo(BeNil())
Expect(consumerOffsetValue).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 5; i++ {
dc, err := consumerOffsetValue.Receive(context.Background())
Expect(err).To(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i+5)))
Expect(dc.Accept(context.Background())).To(BeNil())
}
consumerFirst, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
Offset: &OffsetFirst{},
})
Expect(err).To(BeNil())
Expect(consumerFirst).NotTo(BeNil())
Expect(consumerFirst).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 10; i++ {
dc, err := consumerFirst.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i)))
Expect(dc.Accept(context.Background())).To(BeNil())
}
// the tests Last and Next can't be deterministic
// but we can check if there is at least one message, and it is not the first one
consumerLast, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumerLast_test",
InitialCredits: 10,
Offset: &OffsetLast{},
})
Expect(err).To(BeNil())
Expect(consumerLast).NotTo(BeNil())
Expect(consumerLast).To(BeAssignableToTypeOf(&Consumer{}))
// it should receive at least one message
dc, err := consumerLast.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).NotTo(Equal(fmt.Sprintf("Message #%d", 0)))
Expect(dc.Accept(context.Background())).To(BeNil())
consumerNext, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumerNext_next",
InitialCredits: 10,
Offset: &OffsetNext{},
})
Expect(err).To(BeNil())
Expect(consumerNext).NotTo(BeNil())
Expect(consumerNext).To(BeAssignableToTypeOf(&Consumer{}))
signal := make(chan struct{})
go func() {
// it should receive the next message
dc, err = consumerNext.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal("the next message"))
Expect(dc.Accept(context.Background())).To(BeNil())
signal <- struct{}{}
}()
publishMessages(qName, 1, "the next message")
<-signal
Expect(consumerLast.Close(context.Background())).To(BeNil())
Expect(consumerOffsetValue.Close(context.Background())).To(BeNil())
Expect(consumerFirst.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("consumer should restart form the last offset in case of disconnection", func() {
/*
Test the consumer should restart form the last offset in case of disconnection
So we send 10 messages. Consume 5 then kill the connection and the consumer should restart form
the offset 5 to consume the messages
*/
qName := generateName("consumer should restart form the last offset in case of disconnection")
connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{
SASLType: amqp.SASLTypeAnonymous(),
ContainerID: qName,
RecoveryConfiguration: &RecoveryConfiguration{
ActiveRecovery: true,
// reduced the reconnect interval to speed up the test.
// don't use low values in production
BackOffReconnectInterval: 1_001 * time.Millisecond,
MaxReconnectAttempts: 5,
},
})
Expect(err).To(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.name).To(Equal(qName))
publishMessages(qName, 10)
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer should restart form the last offset in case of disconnection",
InitialCredits: 5,
Offset: &OffsetFirst{},
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 5; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i)))
Expect(dc.Accept(context.Background())).To(BeNil())
}
Eventually(func() bool {
err := testhelper.DropConnectionContainerID(qName)
return err == nil
}).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue())
time.Sleep(1 * time.Second)
Eventually(func() bool {
conn, err := testhelper.GetConnectionByContainerID(qName)
return err == nil && conn != nil
}).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue())
time.Sleep(500 * time.Millisecond)
// the consumer should restart from the last offset
for i := 5; i < 10; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i)))
}
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("consumer should filter messages based on x-stream-filter", func() {
qName := generateName("consumer should filter messages based on x-stream-filter")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.name).To(Equal(qName))
publishMessagesWithStreamTag(qName, "banana", 10)
publishMessagesWithStreamTag(qName, "apple", 10)
publishMessagesWithStreamTag(qName, "", 10)
consumerBanana, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer banana should filter messages based on x-stream-filter",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"banana"},
})
Expect(err).To(BeNil())
Expect(consumerBanana).NotTo(BeNil())
Expect(consumerBanana).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 10; i++ {
dc, err := consumerBanana.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i)))
Expect(dc.Accept(context.Background())).To(BeNil())
}
consumerApple, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"apple"},
FilterMatchUnfiltered: true,
})
Expect(err).To(BeNil())
Expect(consumerApple).NotTo(BeNil())
Expect(consumerApple).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 10; i++ {
dc, err := consumerApple.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i)))
Expect(dc.Accept(context.Background())).To(BeNil())
}
consumerAppleAndBanana, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer apple and banana should filter messages based on x-stream-filter",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"apple", "banana"},
})
Expect(err).To(BeNil())
Expect(consumerAppleAndBanana).NotTo(BeNil())
Expect(consumerAppleAndBanana).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 20; i++ {
dc, err := consumerAppleAndBanana.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
if i < 10 {
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i)))
} else {
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i-10)))
}
Expect(dc.Accept(context.Background())).To(BeNil())
}
consumerAppleMatchUnfiltered, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter and FilterMatchUnfiltered true",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"apple"},
FilterMatchUnfiltered: true,
})
Expect(err).To(BeNil())
Expect(consumerAppleMatchUnfiltered).NotTo(BeNil())
Expect(consumerAppleMatchUnfiltered).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 20; i++ {
dc, err := consumerAppleMatchUnfiltered.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
if i < 10 {
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i)))
} else {
Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf(" #%d", i-10)))
}
Expect(dc.Accept(context.Background())).To(BeNil())
}
Expect(consumerApple.Close(context.Background())).To(BeNil())
Expect(consumerBanana.Close(context.Background())).To(BeNil())
Expect(consumerAppleAndBanana.Close(context.Background())).To(BeNil())
Expect(consumerAppleMatchUnfiltered.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
})

View File

@ -24,9 +24,7 @@ var _ = Describe("NewConsumer tests", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond)
cancel()
_, err = connection.NewConsumer(ctx, &QueueAddress{
Queue: qName,
}, "test")
_, err = connection.NewConsumer(ctx, qName, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("context canceled"))
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
@ -43,7 +41,7 @@ var _ = Describe("NewConsumer tests", func() {
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 10)
consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
consumer, err := connection.NewConsumer(context.Background(), qName, nil)
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
@ -72,7 +70,7 @@ var _ = Describe("NewConsumer tests", func() {
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 1)
consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
consumer, err := connection.NewConsumer(context.Background(), qName, nil)
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
@ -99,7 +97,7 @@ var _ = Describe("NewConsumer tests", func() {
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 1)
consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
consumer, err := connection.NewConsumer(context.Background(), qName, nil)
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
@ -134,7 +132,7 @@ var _ = Describe("NewConsumer tests", func() {
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 2)
consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
consumer, err := connection.NewConsumer(context.Background(), qName, nil)
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))

View File

@ -32,7 +32,7 @@ func NewAmqpManagement() *AmqpManagement {
}
func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
opts := createReceiverLinkOptions(managementNodeAddress, linkPairName, AtMostOnce)
opts := createReceiverLinkOptions(managementNodeAddress, &managementOptions{}, AtMostOnce)
receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts)
if err != nil {
return err

View File

@ -216,7 +216,7 @@ var _ = Describe("AMQP Queue test ", func() {
})
})
func publishMessages(queueName string, count int) {
func publishMessages(queueName string, count int, args ...string) {
conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil)
Expect(err).To(BeNil())
@ -225,7 +225,12 @@ func publishMessages(queueName string, count int) {
Expect(publisher).NotTo(BeNil())
for i := 0; i < count; i++ {
publishResult, err := publisher.Publish(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))))
body := "Message #" + strconv.Itoa(i)
if len(args) > 0 {
body = args[0]
}
publishResult, err := publisher.Publish(context.TODO(), amqp.NewMessage([]byte(body)))
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))

View File

@ -0,0 +1,177 @@
package rabbitmq_amqp
import (
"github.com/Azure/go-amqp"
"github.com/google/uuid"
)
type linkerName interface {
linkName() string
}
func getLinkName(l linkerName) string {
if l == nil || l.linkName() == "" {
return uuid.New().String()
}
return l.linkName()
}
/// ConsumerOptions interface for the AMQP and Stream consumer///
type ConsumerOptions interface {
// linkName returns the name of the link
// if not set it will return a random UUID
linkName() string
// initialCredits returns the initial credits for the link
// if not set it will return 256
initialCredits() int32
// linkFilters returns the link filters for the link.
// It is mostly used for the stream consumers.
linkFilters() []amqp.LinkFilter
}
func getInitialCredits(co ConsumerOptions) int32 {
if co == nil || co.initialCredits() == 0 {
return 256
}
return co.initialCredits()
}
func getLinkFilters(co ConsumerOptions) []amqp.LinkFilter {
if co == nil {
return nil
}
return co.linkFilters()
}
type managementOptions struct {
}
func (mo *managementOptions) linkName() string {
return linkPairName
}
func (mo *managementOptions) initialCredits() int32 {
// by default i 256 but here we set it to 100. For the management is enough.
return 100
}
func (mo *managementOptions) linkFilters() []amqp.LinkFilter {
return nil
}
type AMQPConsumerOptions struct {
//ReceiverLinkName: see the ConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the ConsumerOptions interface
InitialCredits int32
}
func (aco *AMQPConsumerOptions) linkName() string {
return aco.ReceiverLinkName
}
func (aco *AMQPConsumerOptions) initialCredits() int32 {
return aco.InitialCredits
}
func (aco *AMQPConsumerOptions) linkFilters() []amqp.LinkFilter {
return nil
}
type OffsetSpecification interface {
toLinkFilter() amqp.LinkFilter
}
const rmqStreamFilter = "rabbitmq:stream-filter"
const rmqStreamOffsetSpec = "rabbitmq:stream-offset-spec"
const rmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered"
const offsetFirst = "first"
const offsetNext = "next"
const offsetLast = "last"
type OffsetFirst struct {
}
func (of *OffsetFirst) toLinkFilter() amqp.LinkFilter {
return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetFirst)
}
type OffsetLast struct {
}
func (ol *OffsetLast) toLinkFilter() amqp.LinkFilter {
return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetLast)
}
type OffsetValue struct {
Offset uint64
}
func (ov *OffsetValue) toLinkFilter() amqp.LinkFilter {
return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, ov.Offset)
}
type OffsetNext struct {
}
func (on *OffsetNext) toLinkFilter() amqp.LinkFilter {
return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetNext)
}
/*
StreamConsumerOptions represents the options that can be used to create a stream consumer.
It is mandatory in case of creating a stream consumer.
*/
type StreamConsumerOptions struct {
//ReceiverLinkName: see the ConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the ConsumerOptions interface
InitialCredits int32
// The offset specification for the stream consumer
// see the interface implementations
Offset OffsetSpecification
// Filter values.
// See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions for more details
Filters []string
//
FilterMatchUnfiltered bool
}
func (sco *StreamConsumerOptions) linkName() string {
return sco.ReceiverLinkName
}
func (sco *StreamConsumerOptions) initialCredits() int32 {
return sco.InitialCredits
}
func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter {
var filters []amqp.LinkFilter
filters = append(filters, sco.Offset.toLinkFilter())
if sco.Filters != nil {
l := []any{}
for _, f := range sco.Filters {
l = append(l, f)
}
filters = append(filters, amqp.NewLinkFilter(rmqStreamFilter, 0, l))
filters = append(filters, amqp.NewLinkFilter(rmqStreamMatchUnfiltered, 0, sco.FilterMatchUnfiltered))
}
return filters
}
///// ProducerOptions /////
type ProducerOptions interface {
linkName() string
}
type AMQPProducerOptions struct {
SenderLinkName string
}
func (apo *AMQPProducerOptions) linkName() string {
return apo.SenderLinkName
}

View File

@ -40,7 +40,7 @@ func createSenderLinkOptions(address string, linkName string, deliveryMode int)
// receiverLinkOptions returns the options for a receiver link
// with the given address and link name.
// That should be the same for all the links.
func createReceiverLinkOptions(address string, linkName string, deliveryMode int) *amqp.ReceiverOptions {
func createReceiverLinkOptions(address string, options ConsumerOptions, deliveryMode int) *amqp.ReceiverOptions {
prop := make(map[string]any)
prop["paired"] = true
receiverSettleMode := amqp.SenderSettleModeSettled.Ptr()
@ -52,18 +52,20 @@ func createReceiverLinkOptions(address string, linkName string, deliveryMode int
receiverSettleMode = amqp.SenderSettleModeUnsettled.Ptr()
}
return &amqp.ReceiverOptions{
result := &amqp.ReceiverOptions{
TargetAddress: address,
DynamicAddress: false,
Name: linkName,
Name: getLinkName(options),
Properties: prop,
Durability: 0,
ExpiryTimeout: 0,
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
RequestedSenderSettleMode: receiverSettleMode,
ExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
Credit: 100,
Credit: getInitialCredits(options),
Filters: getLinkFilters(options),
}
return result
}
func random(max int) int {

View File

@ -284,7 +284,44 @@ func (a *AutoGeneratedQueueSpecification) buildArguments() map[string]any {
result["x-queue-type"] = a.queueType().String()
return result
}
type StreamQueueSpecification struct {
Name string
MaxLengthBytes int64
InitialClusterSize int
}
func (s *StreamQueueSpecification) name() string {
return s.Name
}
func (s *StreamQueueSpecification) isAutoDelete() bool {
return false
}
func (s *StreamQueueSpecification) isExclusive() bool {
return false
}
func (s *StreamQueueSpecification) queueType() QueueType {
return QueueType{Type: Stream}
}
func (s *StreamQueueSpecification) buildArguments() map[string]any {
result := map[string]any{}
if s.MaxLengthBytes != 0 {
result["x-max-length-bytes"] = s.MaxLengthBytes
}
if s.InitialClusterSize != 0 {
result["x-stream-initial-cluster-size"] = s.InitialClusterSize
}
result["x-queue-type"] = s.queueType().String()
return result
}
// / **** Exchange ****