Implement Filters (#38)

* Closes: Implement properties-filter and application-properties-filter #25
* Refactor the interfaces to be more coherent
---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2025-02-27 13:58:59 +01:00 committed by GitHub
parent 8ffd1e6fc3
commit 24649319d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 629 additions and 195 deletions

View File

@ -12,6 +12,7 @@ format:
vet:
go vet ./pkg/rabbitmqamqp
go vet ./docs/examples/...
STATICCHECK ?= $(GOBIN)/staticcheck
STATICCHECK_VERSION ?= latest
@ -19,6 +20,7 @@ $(STATICCHECK):
go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION)
check: $(STATICCHECK)
$(STATICCHECK) ./pkg/rabbitmqamqp
$(STATICCHECK) ./docs/examples/...

View File

@ -15,7 +15,7 @@ Suitable for testing in pre-production environments.
## Documentation
- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) (work in progress for this client)
- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries)

View File

@ -112,7 +112,7 @@ func main() {
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: exchangeName,
Key: routingKey,
}, "getting-started-publisher")
}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return
@ -129,17 +129,14 @@ func main() {
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
rmq.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0])
break
case *rmq.StateReleased:
rmq.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0])
break
case *rmq.StateRejected:
rmq.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
rmq.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes

View File

@ -31,7 +31,7 @@ func main() {
}
// create a publisher without a target
publisher, err := amqpConnection.NewPublisher(context.TODO(), nil, "stream-publisher")
publisher, err := amqpConnection.NewPublisher(context.TODO(), nil, nil)
checkError(err)
// publish messages to the stream
@ -55,7 +55,6 @@ func main() {
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
break
default:
rmq.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0])
}

View File

@ -118,7 +118,7 @@ func main() {
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{
Queue: queueName,
}, "reliable-publisher")
}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return
@ -147,13 +147,10 @@ func main() {
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
atomic.AddInt32(&stateAccepted, 1)
break
case *rmq.StateReleased:
atomic.AddInt32(&stateReleased, 1)
break
case *rmq.StateRejected:
atomic.AddInt32(&stateRejected, 1)
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes

View File

@ -35,7 +35,7 @@ func main() {
// create a stream publisher. In this case we use the QueueAddress to make the example
// simple. So we use the default exchange here.
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, "stream-publisher")
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, nil)
checkError(err)
// publish messages to the stream
@ -47,17 +47,14 @@ func main() {
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
break
case *rmq.StateReleased:
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
break
case *rmq.StateRejected:
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes

View File

@ -17,6 +17,7 @@ func checkError(err error) {
func main() {
// see also: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions
rmq.Info("Golang AMQP 1.0 Streams example with filtering")
queueStream := "stream-go-queue-filtering-" + time.Now().String()
env := rmq.NewEnvironment([]string{"amqp://"}, nil)
@ -35,7 +36,7 @@ func main() {
// create a stream publisher. In this case we use the QueueAddress to make the example
// simple. So we use the default exchange here.
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, "stream-publisher")
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, nil)
checkError(err)
filters := []string{"MyFilter1", "MyFilter2", "MyFilter3", "MyFilter4"}
@ -50,17 +51,14 @@ func main() {
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
break
case *rmq.StateReleased:
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
break
case *rmq.StateRejected:
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
@ -76,7 +74,25 @@ func main() {
// add a filter to the consumer, in this case we use only the filter values
// MyFilter1 and MyFilter2. So all other messages won't be received
Filters: []string{"MyFilter1", "MyFilter2"},
StreamFilterOptions: &rmq.StreamFilterOptions{
Values: []string{"MyFilter1", "MyFilter2"},
// it is also possible to filter by application properties or message properties
// you can create filters like:
// msg.ApplicationProperties = map[string]interface{}{"key3": "value3"}
// during the publish you can do something like:
// msg.ApplicationProperties = map[string]interface{}{"key1": "value1"}
// publisher.Publish(context.Background(), msg)
//ApplicationProperties: nil,
// or here you can filter by message properties
// like:
// msg.Properties = &amqp.MessageProperties{Subject: "MySubject"}
// during the publish you can do something like:
// msg.Properties = &amqp.MessageProperties{Subject: "MySubject"}
// publisher.Publish(context.Background(), msg)
//Properties: nil,
// see amqp_consumer_stream_test.go for more examples
},
})
checkError(err)

View File

@ -60,7 +60,7 @@ func main() {
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: exchangeName,
Key: routingKey,
}, "getting-started-publisher")
}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return

View File

@ -6,9 +6,9 @@ import (
"strings"
)
// TargetAddress is an interface that represents an address that can be used to send messages to.
// ITargetAddress is an interface that represents an address that can be used to send messages to.
// It can be either a Queue or an Exchange with a routing key.
type TargetAddress interface {
type ITargetAddress interface {
toAddress() (string, error)
}

View File

@ -60,8 +60,3 @@ var _ = Describe("address builder test ", func() {
})
})
})
// Helper function to create string pointers
func stringPtr(s string) *string {
return &s
}

View File

@ -75,9 +75,10 @@ func (a *AmqpConnection) Properties() map[string]any {
}
// NewPublisher creates a new Publisher that sends messages to the provided destination.
// The destination is a TargetAddress that can be a Queue or an Exchange with a routing key.
// The destination is a ITargetAddress that can be a Queue or an Exchange with a routing key.
// options is an IPublisherOptions that can be used to configure the publisher.
// See QueueAddress and ExchangeAddress for more information.
func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAddress, linkName string) (*Publisher, error) {
func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAddress, options IPublisherOptions) (*Publisher, error) {
destinationAdd := ""
err := error(nil)
if destination != nil {
@ -91,11 +92,11 @@ func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAdd
}
}
return newPublisher(ctx, a, destinationAdd, linkName)
return newPublisher(ctx, a, destinationAdd, options)
}
// NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress.
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error) {
// NewConsumer creates a new Consumer that listens to the provided Queue
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error) {
destination := &QueueAddress{
Queue: queueName,
}
@ -362,7 +363,7 @@ func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged) {
a.lifeCycle.chStatusChanged = channel
}
func (a *AmqpConnection) State() LifeCycleState {
func (a *AmqpConnection) State() ILifeCycleState {
return a.lifeCycle.State()
}

View File

@ -49,19 +49,19 @@ func newEntitiesTracker() *entitiesTracker {
}
}
func (e *entitiesTracker) storeOrReplaceProducer(entity entityIdentifier) {
func (e *entitiesTracker) storeOrReplaceProducer(entity iEntityIdentifier) {
e.publishers.Store(entity.Id(), entity)
}
func (e *entitiesTracker) removeProducer(entity entityIdentifier) {
func (e *entitiesTracker) removeProducer(entity iEntityIdentifier) {
e.publishers.Delete(entity.Id())
}
func (e *entitiesTracker) storeOrReplaceConsumer(entity entityIdentifier) {
func (e *entitiesTracker) storeOrReplaceConsumer(entity iEntityIdentifier) {
e.consumers.Store(entity.Id(), entity)
}
func (e *entitiesTracker) removeConsumer(entity entityIdentifier) {
func (e *entitiesTracker) removeConsumer(entity iEntityIdentifier) {
e.consumers.Delete(entity.Id())
}

View File

@ -50,7 +50,7 @@ var _ = Describe("Recovery connection test", func() {
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{
Queue: qName,
}, "test")
}, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())

View File

@ -84,7 +84,11 @@ var _ = Describe("AMQP connection Test", func() {
})
Expect(err).To(BeNil())
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: queueName}, "test")
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: queueName},
&PublisherOptions{
Id: "my_id",
SenderLinkName: "my_sender_link",
})
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
consumer, err := connection.NewConsumer(context.Background(), queueName, nil)

View File

@ -68,7 +68,7 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio
type Consumer struct {
receiver atomic.Pointer[amqp.Receiver]
connection *AmqpConnection
options ConsumerOptions
options IConsumerOptions
destinationAdd string
id string
@ -85,10 +85,10 @@ func (c *Consumer) Id() string {
return c.id
}
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options ConsumerOptions, args ...string) (*Consumer, error) {
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options IConsumerOptions) (*Consumer, error) {
id := fmt.Sprintf("consumer-%s", uuid.New().String())
if len(args) > 0 {
id = args[0]
if options != nil && options.id() != "" {
id = options.id()
}
r := &Consumer{connection: connection, options: options,

View File

@ -7,31 +7,10 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
testhelper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper"
"strconv"
"sync"
"time"
)
func publishMessagesWithStreamTag(queueName string, filterValue string, count int) {
conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil)
Expect(err).To(BeNil())
publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queueName}, "producer_filter_stream")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
for i := 0; i < count; i++ {
body := filterValue + " #" + strconv.Itoa(i)
msg := NewMessageWithFilter([]byte(body), filterValue)
publishResult, err := publisher.Publish(context.TODO(), msg)
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
}
err = conn.Close(context.TODO())
Expect(err).To(BeNil())
}
var _ = Describe("Consumer stream test", func() {
It("start consuming with different offset types", func() {
@ -217,15 +196,34 @@ var _ = Describe("Consumer stream test", func() {
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.name).To(Equal(qName))
publishMessagesWithStreamTag(qName, "banana", 10)
publishMessagesWithStreamTag(qName, "apple", 10)
publishMessagesWithStreamTag(qName, "", 10)
publishMessagesWithMessageLogic(qName, "banana", 10, func(msg *amqp.Message) {
msg.Annotations = amqp.Annotations{
// here we set the filter value taken from the filters array
StreamFilterValue: "banana",
}
})
publishMessagesWithMessageLogic(qName, "apple", 10, func(msg *amqp.Message) {
msg.Annotations = amqp.Annotations{
// here we set the filter value taken from the filters array
StreamFilterValue: "apple",
}
})
publishMessagesWithMessageLogic(qName, "", 10, func(msg *amqp.Message) {
msg.Annotations = amqp.Annotations{
// here we set the filter value taken from the filters array
StreamFilterValue: "",
}
})
consumerBanana, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer banana should filter messages based on x-stream-filter",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"banana"},
StreamFilterOptions: &StreamFilterOptions{
Values: []string{"banana"},
},
})
Expect(err).To(BeNil())
@ -235,16 +233,18 @@ var _ = Describe("Consumer stream test", func() {
dc, err := consumerBanana.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i)))
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "banana")))
Expect(dc.Accept(context.Background())).To(BeNil())
}
consumerApple, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"apple"},
FilterMatchUnfiltered: true,
ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter",
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
Values: []string{"apple"},
MatchUnfiltered: true,
},
})
Expect(err).To(BeNil())
@ -254,7 +254,8 @@ var _ = Describe("Consumer stream test", func() {
dc, err := consumerApple.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i)))
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "apple")))
Expect(dc.Accept(context.Background())).To(BeNil())
}
@ -262,7 +263,9 @@ var _ = Describe("Consumer stream test", func() {
ReceiverLinkName: "consumer apple and banana should filter messages based on x-stream-filter",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"apple", "banana"},
StreamFilterOptions: &StreamFilterOptions{
Values: []string{"apple", "banana"},
},
})
Expect(err).To(BeNil())
@ -273,19 +276,23 @@ var _ = Describe("Consumer stream test", func() {
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
if i < 10 {
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i)))
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "banana")))
} else {
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i-10)))
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i-10, "apple")))
}
Expect(dc.Accept(context.Background())).To(BeNil())
}
consumerAppleMatchUnfiltered, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter and FilterMatchUnfiltered true",
InitialCredits: 200,
Offset: &OffsetFirst{},
Filters: []string{"apple"},
FilterMatchUnfiltered: true,
ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter and MatchUnfiltered true",
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
Values: []string{"apple"},
MatchUnfiltered: true,
},
})
Expect(err).To(BeNil())
@ -296,9 +303,9 @@ var _ = Describe("Consumer stream test", func() {
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
if i < 10 {
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i)))
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, "apple")))
} else {
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf(" #%d", i-10)))
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i-10, "")))
}
Expect(dc.Accept(context.Background())).To(BeNil())
}
@ -311,4 +318,263 @@ var _ = Describe("Consumer stream test", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
Describe("consumer should filter messages based on application properties", func() {
qName := generateName("consumer should filter messages based on application properties")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
publishMessagesWithMessageLogic(qName, "ignoredKey", 7, func(msg *amqp.Message) {
msg.ApplicationProperties = map[string]interface{}{"ignoredKey": "ignoredValue"}
})
publishMessagesWithMessageLogic(qName, "key1", 10, func(msg *amqp.Message) {
msg.ApplicationProperties = map[string]interface{}{"key1": "value1", "constFilterKey": "constFilterValue"}
})
publishMessagesWithMessageLogic(qName, "key2", 10, func(msg *amqp.Message) {
msg.ApplicationProperties = map[string]interface{}{"key2": "value2", "constFilterKey": "constFilterValue"}
})
publishMessagesWithMessageLogic(qName, "key3", 10, func(msg *amqp.Message) {
msg.ApplicationProperties = map[string]interface{}{"key3": "value3", "constFilterKey": "constFilterValue"}
})
var wg sync.WaitGroup
wg.Add(3)
DescribeTable("consumer should filter messages based on application properties", func(key string, value any, label string) {
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
ApplicationProperties: map[string]any{
key: value,
// this is a constant filter append during the publishMessagesWithApplicationProperties
// to test the multiple filters
"constFilterKey": "constFilterValue",
},
},
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 10; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, label)))
Expect(dc.message.ApplicationProperties).To(HaveKeyWithValue(key, value))
Expect(dc.Accept(context.Background())).To(BeNil())
}
Expect(consumer.Close(context.Background())).To(BeNil())
wg.Done()
},
Entry("key1 value1", "key1", "value1", "key1"),
Entry("key2 value2", "key2", "value2", "key2"),
Entry("key3 value3", "key3", "value3", "key3"),
)
go func() {
wg.Wait()
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
}()
})
Describe("consumer should filter messages based on properties", func() {
/*
Test the consumer should filter messages based on properties
*/
qName := generateName("consumer should filter messages based on properties")
qName += time.Now().String()
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
publishMessagesWithMessageLogic(qName, "MessageID", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{MessageID: "MessageID"}
})
publishMessagesWithMessageLogic(qName, "Subject", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{Subject: stringPtr("Subject")}
})
publishMessagesWithMessageLogic(qName, "ReplyTo", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{ReplyTo: stringPtr("ReplyTo")}
})
publishMessagesWithMessageLogic(qName, "ContentType", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{ContentType: stringPtr("ContentType")}
})
publishMessagesWithMessageLogic(qName, "ContentEncoding", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{ContentEncoding: stringPtr("ContentEncoding")}
})
publishMessagesWithMessageLogic(qName, "GroupID", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{GroupID: stringPtr("GroupID")}
})
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: stringPtr("ReplyToGroupID")}
})
// GroupSequence
publishMessagesWithMessageLogic(qName, "GroupSequence", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{GroupSequence: uint32Ptr(137)}
})
// ReplyToGroupID
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: stringPtr("ReplyToGroupID")}
})
// CreationTime
publishMessagesWithMessageLogic(qName, "CreationTime", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{CreationTime: timePtr(createDateTime())}
})
// AbsoluteExpiryTime
publishMessagesWithMessageLogic(qName, "AbsoluteExpiryTime", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{AbsoluteExpiryTime: timePtr(createDateTime())}
})
// CorrelationID
publishMessagesWithMessageLogic(qName, "CorrelationID", 10, func(msg *amqp.Message) {
msg.Properties = &amqp.MessageProperties{CorrelationID: "CorrelationID"}
})
var wg sync.WaitGroup
wg.Add(12)
DescribeTable("consumer should filter messages based on properties", func(properties *amqp.MessageProperties, label string) {
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
Properties: properties,
},
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 10; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, label)))
// we test one by one because of the date time fields
// It is not possible to compare the whole structure due of the time
// It is not perfect but it is enough for the test
if dc.message.Properties.MessageID != nil {
Expect(dc.message.Properties.MessageID).To(Equal(properties.MessageID))
}
if dc.message.Properties.Subject != nil {
Expect(dc.message.Properties.Subject).To(Equal(properties.Subject))
}
if dc.message.Properties.ReplyTo != nil {
Expect(dc.message.Properties.ReplyTo).To(Equal(properties.ReplyTo))
}
if dc.message.Properties.ContentType != nil {
Expect(dc.message.Properties.ContentType).To(Equal(properties.ContentType))
}
if dc.message.Properties.ContentEncoding != nil {
Expect(dc.message.Properties.ContentEncoding).To(Equal(properties.ContentEncoding))
}
if dc.message.Properties.GroupID != nil {
Expect(dc.message.Properties.GroupID).To(Equal(properties.GroupID))
}
if dc.message.Properties.ReplyToGroupID != nil {
Expect(dc.message.Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
}
if dc.message.Properties.GroupSequence != nil {
Expect(dc.message.Properties.GroupSequence).To(Equal(properties.GroupSequence))
}
if dc.message.Properties.ReplyToGroupID != nil {
Expect(dc.message.Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
}
// here we compare only the year, month and day
// it is not perfect but it is enough for the test
if dc.message.Properties.CreationTime != nil {
Expect(dc.message.Properties.CreationTime.Year()).To(Equal(properties.CreationTime.Year()))
Expect(dc.message.Properties.CreationTime.Month()).To(Equal(properties.CreationTime.Month()))
Expect(dc.message.Properties.CreationTime.Day()).To(Equal(properties.CreationTime.Day()))
}
if dc.message.Properties.AbsoluteExpiryTime != nil {
Expect(dc.message.Properties.AbsoluteExpiryTime.Year()).To(Equal(properties.AbsoluteExpiryTime.Year()))
Expect(dc.message.Properties.AbsoluteExpiryTime.Month()).To(Equal(properties.AbsoluteExpiryTime.Month()))
Expect(dc.message.Properties.AbsoluteExpiryTime.Day()).To(Equal(properties.AbsoluteExpiryTime.Day()))
}
if dc.message.Properties.CorrelationID != nil {
Expect(dc.message.Properties.CorrelationID).To(Equal(properties.CorrelationID))
}
Expect(dc.Accept(context.Background())).To(BeNil())
}
Expect(consumer.Close(context.Background())).To(BeNil())
wg.Done()
},
Entry("MessageID", &amqp.MessageProperties{MessageID: "MessageID"}, "MessageID"),
Entry("Subject", &amqp.MessageProperties{Subject: stringPtr("Subject")}, "Subject"),
Entry("ReplyTo", &amqp.MessageProperties{ReplyTo: stringPtr("ReplyTo")}, "ReplyTo"),
Entry("ContentType", &amqp.MessageProperties{ContentType: stringPtr("ContentType")}, "ContentType"),
Entry("ContentEncoding", &amqp.MessageProperties{ContentEncoding: stringPtr("ContentEncoding")}, "ContentEncoding"),
Entry("GroupID", &amqp.MessageProperties{GroupID: stringPtr("GroupID")}, "GroupID"),
Entry("ReplyToGroupID", &amqp.MessageProperties{ReplyToGroupID: stringPtr("ReplyToGroupID")}, "ReplyToGroupID"),
Entry("GroupSequence", &amqp.MessageProperties{GroupSequence: uint32Ptr(137)}, "GroupSequence"),
Entry("ReplyToGroupID", &amqp.MessageProperties{ReplyToGroupID: stringPtr("ReplyToGroupID")}, "ReplyToGroupID"),
Entry("CreationTime", &amqp.MessageProperties{CreationTime: timePtr(createDateTime())}, "CreationTime"),
Entry("AbsoluteExpiryTime", &amqp.MessageProperties{AbsoluteExpiryTime: timePtr(createDateTime())}, "AbsoluteExpiryTime"),
Entry("CorrelationID", &amqp.MessageProperties{CorrelationID: "CorrelationID"}, "CorrelationID"),
)
go func() {
wg.Wait()
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
}()
})
})
type msgLogic = func(*amqp.Message)
func publishMessagesWithMessageLogic(queue string, label string, count int, logic msgLogic) {
conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil)
Expect(err).To(BeNil())
publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queue},
nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
for i := 0; i < count; i++ {
body := fmt.Sprintf("Message_id:%d_label:%s", i, label)
msg := NewMessage([]byte(body))
logic(msg)
publishResult, err := publisher.Publish(context.TODO(), msg)
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
}
err = conn.Close(context.TODO())
Expect(err).To(BeNil())
}

View File

@ -173,9 +173,9 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
return make(map[string]any), nil
}
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification QueueSpecification) (*AmqpQueueInfo, error) {
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueSpecification) (*AmqpQueueInfo, error) {
if specification == nil {
return nil, fmt.Errorf("queue specification cannot be nil. You need to provide a valid QueueSpecification")
return nil, fmt.Errorf("queue specification cannot be nil. You need to provide a valid IQueueSpecification")
}
amqpQueue := newAmqpQueue(a, specification.name())
@ -192,9 +192,9 @@ func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error {
return q.Delete(ctx)
}
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification ExchangeSpecification) (*AmqpExchangeInfo, error) {
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification IExchangeSpecification) (*AmqpExchangeInfo, error) {
if exchangeSpecification == nil {
return nil, errors.New("exchange specification cannot be nil. You need to provide a valid ExchangeSpecification")
return nil, errors.New("exchange specification cannot be nil. You need to provide a valid IExchangeSpecification")
}
exchange := newAmqpExchange(a, exchangeSpecification.name())
@ -208,9 +208,9 @@ func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
return e.Delete(ctx)
}
func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification BindingSpecification) (string, error) {
func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBindingSpecification) (string, error) {
if bindingSpecification == nil {
return "", fmt.Errorf("binding specification cannot be nil. You need to provide a valid BindingSpecification")
return "", fmt.Errorf("binding specification cannot be nil. You need to provide a valid IBindingSpecification")
}
bind := newAMQPBinding(a)
@ -220,9 +220,9 @@ func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification BindingS
return bind.Bind(ctx)
}
func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error {
func (a *AmqpManagement) Unbind(ctx context.Context, path string) error {
bind := newAMQPBinding(a)
return bind.Unbind(ctx, bindingPath)
return bind.Unbind(ctx, path)
}
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error) {
path, err := queueAddress(&queueName)
@ -236,8 +236,10 @@ func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*Amqp
return newAmqpQueueInfo(result), nil
}
func (a *AmqpManagement) PurgeQueue(ctx context.Context, queueName string) (int, error) {
purge := newAmqpQueue(a, queueName)
// PurgeQueue purges the queue
// returns the number of messages purged
func (a *AmqpManagement) PurgeQueue(ctx context.Context, name string) (int, error) {
purge := newAmqpQueue(a, name)
return purge.Purge(ctx)
}
@ -245,6 +247,6 @@ func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged) {
a.lifeCycle.chStatusChanged = channel
}
func (a *AmqpManagement) State() LifeCycleState {
func (a *AmqpManagement) State() ILifeCycleState {
return a.lifeCycle.State()
}

View File

@ -49,15 +49,19 @@ var _ = Describe("Management tests", func() {
management := connection.Management()
kv := make(map[string]any)
kv["durable"] = true
kv["auto_delete"] = false
kv["auto_delete"] = true
_queueArguments := make(map[string]any)
_queueArguments["x-queue-type"] = "quorum"
_queueArguments["x-queue-type"] = "classic"
kv["arguments"] = _queueArguments
path := "/queues/test"
result, err := management.Request(context.Background(), kv, path, "PUT", []int{200})
Expect(err).To(BeNil())
Expect(result).NotTo(BeNil())
result, err = management.Request(context.Background(), amqp.Null{}, path, "DELETE", []int{responseCode200})
Expect(err).To(BeNil())
Expect(result).NotTo(BeNil())
Expect(management.Close(context.Background())).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})

View File

@ -26,13 +26,13 @@ func (m *Publisher) Id() string {
return m.id
}
func newPublisher(ctx context.Context, connection *AmqpConnection, destinationAdd string, linkName string, args ...string) (*Publisher, error) {
func newPublisher(ctx context.Context, connection *AmqpConnection, destinationAdd string, options IPublisherOptions) (*Publisher, error) {
id := fmt.Sprintf("publisher-%s", uuid.New().String())
if len(args) > 0 {
id = args[0]
if options != nil && options.id() != "" {
id = options.id()
}
r := &Publisher{connection: connection, linkName: linkName, destinationAdd: destinationAdd, id: id}
r := &Publisher{connection: connection, linkName: getLinkName(options), destinationAdd: destinationAdd, id: id}
connection.entitiesTracker.storeOrReplaceProducer(r)
err := r.createSender(ctx)
if err != nil {

View File

@ -18,7 +18,7 @@ var _ = Describe("AMQP publisher ", func() {
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, "test")
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
Expect(publisher).To(BeAssignableToTypeOf(&Publisher{}))
@ -40,7 +40,7 @@ var _ = Describe("AMQP publisher ", func() {
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
exchangeName := "Nope"
publisher, err := connection.NewPublisher(context.Background(), &ExchangeAddress{Exchange: exchangeName}, "test")
publisher, err := connection.NewPublisher(context.Background(), &ExchangeAddress{Exchange: exchangeName}, nil)
Expect(err).NotTo(BeNil())
Expect(publisher).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
@ -62,7 +62,7 @@ var _ = Describe("AMQP publisher ", func() {
publisher, err := connection.NewPublisher(context.Background(), &ExchangeAddress{
Exchange: eName,
Key: routingKeyNope,
}, "test")
}, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
publishResult, err := publisher.Publish(context.Background(), NewMessage([]byte("hello")))
@ -82,7 +82,7 @@ var _ = Describe("AMQP publisher ", func() {
Name: qName,
})
Expect(err).To(BeNil())
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, "test")
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
publishResult, err := publisher.Publish(context.Background(), NewMessage([]byte("hello")))
@ -100,7 +100,7 @@ var _ = Describe("AMQP publisher ", func() {
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
publisher, err := connection.NewPublisher(context.Background(), nil, "test")
publisher, err := connection.NewPublisher(context.Background(), nil, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
qName := generateNameWithDateTime("Targets NewPublisher should fail when the destination does not exist")
@ -125,7 +125,7 @@ var _ = Describe("AMQP publisher ", func() {
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
Expect(err).To(BeNil())
publisher, err := connection.NewPublisher(context.Background(), nil, "test")
publisher, err := connection.NewPublisher(context.Background(), nil, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
name := generateNameWithDateTime("Targets NewPublisher should success with StateReceived when the destination exists")
@ -180,7 +180,7 @@ var _ = Describe("AMQP publisher ", func() {
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
publisher, err := connection.NewPublisher(context.Background(), nil, "test")
publisher, err := connection.NewPublisher(context.Background(), nil, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
msg := NewMessage([]byte("hello"))

View File

@ -8,14 +8,16 @@ import (
)
type AmqpQueueInfo struct {
name string
isDurable bool
isAutoDelete bool
isExclusive bool
leader string
members []string
arguments map[string]any
queueType TQueueType
name string
isDurable bool
isAutoDelete bool
isExclusive bool
leader string
members []string
arguments map[string]any
queueType TQueueType
consumerCount uint32
messageCount uint64
}
func (a *AmqpQueueInfo) Leader() string {
@ -28,14 +30,16 @@ func (a *AmqpQueueInfo) Members() []string {
func newAmqpQueueInfo(response map[string]any) *AmqpQueueInfo {
return &AmqpQueueInfo{
name: response["name"].(string),
isDurable: response["durable"].(bool),
isAutoDelete: response["auto_delete"].(bool),
isExclusive: response["exclusive"].(bool),
queueType: TQueueType(response["type"].(string)),
leader: response["leader"].(string),
members: response["replicas"].([]string),
arguments: response["arguments"].(map[string]any),
name: response["name"].(string),
isDurable: response["durable"].(bool),
isAutoDelete: response["auto_delete"].(bool),
isExclusive: response["exclusive"].(bool),
queueType: TQueueType(response["type"].(string)),
leader: response["leader"].(string),
members: response["replicas"].([]string),
arguments: response["arguments"].(map[string]any),
consumerCount: response["consumer_count"].(uint32),
messageCount: response["message_count"].(uint64),
}
}
@ -63,6 +67,14 @@ func (a *AmqpQueueInfo) Arguments() map[string]any {
return a.arguments
}
func (a *AmqpQueueInfo) ConsumerCount() uint32 {
return a.consumerCount
}
func (a *AmqpQueueInfo) MessageCount() uint64 {
return a.messageCount
}
type AmqpQueue struct {
management *AmqpManagement
arguments map[string]any

View File

@ -69,6 +69,8 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsAutoDelete()).To(BeTrue())
Expect(queueInfo.IsExclusive()).To(BeTrue())
Expect(queueInfo.Type()).To(Equal(Classic))
Expect(queueInfo.messageCount).To(BeZero())
Expect(queueInfo.consumerCount).To(BeZero())
Expect(queueInfo.Leader()).To(ContainSubstring("rabbit"))
Expect(len(queueInfo.Members())).To(BeNumerically(">", 0))
@ -245,7 +247,7 @@ func publishMessages(queueName string, count int, args ...string) {
conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil)
Expect(err).To(BeNil())
publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queueName}, "test")
publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queueName}, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())

View File

@ -13,20 +13,20 @@ type StateRejected = amqp.StateRejected
type StateReleased = amqp.StateReleased
type StateModified = amqp.StateModified
type linkerName interface {
type iLinkerName interface {
linkName() string
}
func getLinkName(l linkerName) string {
func getLinkName(l iLinkerName) string {
if l == nil || l.linkName() == "" {
return uuid.New().String()
}
return l.linkName()
}
/// ConsumerOptions interface for the AMQP and Stream consumer///
/// IConsumerOptions interface for the AMQP and Stream consumer///
type ConsumerOptions interface {
type IConsumerOptions interface {
// linkName returns the name of the link
// if not set it will return a random UUID
linkName() string
@ -37,16 +37,19 @@ type ConsumerOptions interface {
// linkFilters returns the link filters for the link.
// It is mostly used for the stream consumers.
linkFilters() []amqp.LinkFilter
// id returns the id of the consumer
id() string
}
func getInitialCredits(co ConsumerOptions) int32 {
func getInitialCredits(co IConsumerOptions) int32 {
if co == nil || co.initialCredits() == 0 {
return 256
}
return co.initialCredits()
}
func getLinkFilters(co ConsumerOptions) []amqp.LinkFilter {
func getLinkFilters(co IConsumerOptions) []amqp.LinkFilter {
if co == nil {
return nil
}
@ -69,32 +72,46 @@ func (mo *managementOptions) linkFilters() []amqp.LinkFilter {
return nil
}
type AMQPConsumerOptions struct {
//ReceiverLinkName: see the ConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the ConsumerOptions interface
InitialCredits int32
func (mo *managementOptions) id() string {
return "management"
}
func (aco *AMQPConsumerOptions) linkName() string {
// ConsumerOptions represents the options for quorum and classic queues
type ConsumerOptions struct {
//ReceiverLinkName: see the IConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the IConsumerOptions interface
InitialCredits int32
// The id of the consumer
Id string
}
func (aco *ConsumerOptions) linkName() string {
return aco.ReceiverLinkName
}
func (aco *AMQPConsumerOptions) initialCredits() int32 {
func (aco *ConsumerOptions) initialCredits() int32 {
return aco.InitialCredits
}
func (aco *AMQPConsumerOptions) linkFilters() []amqp.LinkFilter {
func (aco *ConsumerOptions) linkFilters() []amqp.LinkFilter {
return nil
}
type OffsetSpecification interface {
func (aco *ConsumerOptions) id() string {
return aco.Id
}
type IOffsetSpecification interface {
toLinkFilter() amqp.LinkFilter
}
const rmqStreamFilter = "rabbitmq:stream-filter"
const rmqStreamOffsetSpec = "rabbitmq:stream-offset-spec"
const rmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered"
const amqpApplicationPropertiesFilter = "amqp:application-properties-filter"
const amqpPropertiesFilter = "amqp:properties-filter"
const offsetFirst = "first"
const offsetNext = "next"
const offsetLast = "last"
@ -128,23 +145,37 @@ func (on *OffsetNext) toLinkFilter() amqp.LinkFilter {
return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetNext)
}
// StreamFilterOptions represents the options that can be used to filter the stream data.
// It is used in the StreamConsumerOptions.
// See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions/
type StreamFilterOptions struct {
// Filter values.
Values []string
//
MatchUnfiltered bool
// Filter the data based on Application Property
ApplicationProperties map[string]any
// Filter the data based on Message Properties
Properties *amqp.MessageProperties
}
/*
StreamConsumerOptions represents the options that can be used to create a stream consumer.
StreamConsumerOptions represents the options for stream queues
It is mandatory in case of creating a stream consumer.
*/
type StreamConsumerOptions struct {
//ReceiverLinkName: see the ConsumerOptions interface
//ReceiverLinkName: see the IConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the ConsumerOptions interface
//InitialCredits: see the IConsumerOptions interface
InitialCredits int32
// The offset specification for the stream consumer
// see the interface implementations
Offset OffsetSpecification
// Filter values.
// See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions for more details
Filters []string
//
FilterMatchUnfiltered bool
Offset IOffsetSpecification
StreamFilterOptions *StreamFilterOptions
Id string
}
func (sco *StreamConsumerOptions) linkName() string {
@ -157,29 +188,108 @@ func (sco *StreamConsumerOptions) initialCredits() int32 {
func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter {
var filters []amqp.LinkFilter
filters = append(filters, sco.Offset.toLinkFilter())
if sco.Filters != nil {
l := []any{}
for _, f := range sco.Filters {
if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Values != nil {
var l []any
for _, f := range sco.StreamFilterOptions.Values {
l = append(l, f)
}
filters = append(filters, amqp.NewLinkFilter(rmqStreamFilter, 0, l))
filters = append(filters, amqp.NewLinkFilter(rmqStreamMatchUnfiltered, 0, sco.FilterMatchUnfiltered))
filters = append(filters, amqp.NewLinkFilter(rmqStreamMatchUnfiltered, 0, sco.StreamFilterOptions.MatchUnfiltered))
}
if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.ApplicationProperties != nil {
l := map[string]any{}
for k, v := range sco.StreamFilterOptions.ApplicationProperties {
l[k] = v
}
filters = append(filters, amqp.NewLinkFilter(amqpApplicationPropertiesFilter, 0, l))
}
if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Properties != nil {
l := map[amqp.Symbol]any{}
if sco.StreamFilterOptions.Properties.ContentType != nil {
l["content-type"] = amqp.Symbol(*sco.StreamFilterOptions.Properties.ContentType)
}
if sco.StreamFilterOptions.Properties.ContentEncoding != nil {
l["content-encoding"] = amqp.Symbol(*sco.StreamFilterOptions.Properties.ContentEncoding)
}
if sco.StreamFilterOptions.Properties.CorrelationID != nil {
l["correlation-id"] = sco.StreamFilterOptions.Properties.CorrelationID
}
if sco.StreamFilterOptions.Properties.MessageID != nil {
l["message-id"] = sco.StreamFilterOptions.Properties.MessageID
}
if sco.StreamFilterOptions.Properties.Subject != nil {
l["subject"] = *sco.StreamFilterOptions.Properties.Subject
}
if sco.StreamFilterOptions.Properties.ReplyTo != nil {
l["reply-to"] = *sco.StreamFilterOptions.Properties.ReplyTo
}
if sco.StreamFilterOptions.Properties.To != nil {
l["to"] = *sco.StreamFilterOptions.Properties.To
}
if sco.StreamFilterOptions.Properties.GroupID != nil {
l["group-id"] = *sco.StreamFilterOptions.Properties.GroupID
}
if sco.StreamFilterOptions.Properties.UserID != nil {
l["user-id"] = sco.StreamFilterOptions.Properties.UserID
}
if sco.StreamFilterOptions.Properties.AbsoluteExpiryTime != nil {
l["absolute-expiry-time"] = sco.StreamFilterOptions.Properties.AbsoluteExpiryTime
}
if sco.StreamFilterOptions.Properties.CreationTime != nil {
l["creation-time"] = sco.StreamFilterOptions.Properties.CreationTime
}
if sco.StreamFilterOptions.Properties.GroupSequence != nil {
l["group-sequence"] = *sco.StreamFilterOptions.Properties.GroupSequence
}
if sco.StreamFilterOptions.Properties.ReplyToGroupID != nil {
l["reply-to-group-id"] = *sco.StreamFilterOptions.Properties.ReplyToGroupID
}
if len(l) > 0 {
filters = append(filters, amqp.NewLinkFilter(amqpPropertiesFilter, 0, l))
}
}
return filters
}
///// ProducerOptions /////
type ProducerOptions interface {
linkName() string
func (sco *StreamConsumerOptions) id() string {
return sco.Id
}
type AMQPProducerOptions struct {
///// PublisherOptions /////
type IPublisherOptions interface {
linkName() string
id() string
}
type PublisherOptions struct {
Id string
SenderLinkName string
}
func (apo *AMQPProducerOptions) linkName() string {
func (apo *PublisherOptions) linkName() string {
return apo.SenderLinkName
}
func (apo *PublisherOptions) id() string {
return apo.Id
}

View File

@ -41,7 +41,7 @@ func createSenderLinkOptions(address string, linkName string, deliveryMode int)
// receiverLinkOptions returns the options for a receiver link
// with the given address and link name.
// That should be the same for all the links.
func createReceiverLinkOptions(address string, options ConsumerOptions, deliveryMode int) *amqp.ReceiverOptions {
func createReceiverLinkOptions(address string, options IConsumerOptions, deliveryMode int) *amqp.ReceiverOptions {
prop := make(map[string]any)
prop["paired"] = true
receiverSettleMode := amqp.SenderSettleModeSettled.Ptr()

View File

@ -1,6 +1,6 @@
package rabbitmqamqp
type entityIdentifier interface {
type iEntityIdentifier interface {
Id() string
}
@ -21,9 +21,9 @@ func (e QueueType) String() string {
}
/*
QueueSpecification represents the specification of a queue
IQueueSpecification represents the specification of a queue
*/
type QueueSpecification interface {
type IQueueSpecification interface {
name() string
isAutoDelete() bool
isExclusive() bool
@ -31,7 +31,7 @@ type QueueSpecification interface {
buildArguments() map[string]any
}
type OverflowStrategy interface {
type IOverflowStrategy interface {
overflowStrategy() string
}
@ -56,7 +56,7 @@ func (r *RejectPublishDlxOverflowStrategy) overflowStrategy() string {
return "reject-publish-dlx"
}
type LeaderLocator interface {
type ILeaderLocator interface {
leaderLocator() string
}
@ -79,18 +79,19 @@ QuorumQueueSpecification represents the specification of the quorum queue
*/
type QuorumQueueSpecification struct {
Name string
AutoExpire int64
MessageTTL int64
OverflowStrategy OverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
DeliveryLimit int64
TargetClusterSize int64
LeaderLocator LeaderLocator
Name string
AutoExpire int64
MessageTTL int64
OverflowStrategy IOverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
DeliveryLimit int64
TargetClusterSize int64
LeaderLocator ILeaderLocator
QuorumInitialGroupSize int
}
func (q *QuorumQueueSpecification) name() string {
@ -155,6 +156,10 @@ func (q *QuorumQueueSpecification) buildArguments() map[string]any {
result["x-queue-leader-locator"] = q.LeaderLocator.leaderLocator()
}
if q.QuorumInitialGroupSize != 0 {
result["x-quorum-initial-group-size"] = q.QuorumInitialGroupSize
}
result["x-queue-type"] = q.queueType().String()
return result
}
@ -168,14 +173,14 @@ type ClassicQueueSpecification struct {
IsExclusive bool
AutoExpire int64
MessageTTL int64
OverflowStrategy OverflowStrategy
OverflowStrategy IOverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
MaxPriority int64
LeaderLocator LeaderLocator
LeaderLocator ILeaderLocator
}
func (q *ClassicQueueSpecification) name() string {
@ -344,8 +349,8 @@ func (e ExchangeType) String() string {
return string(e.Type)
}
// ExchangeSpecification represents the specification of an exchange
type ExchangeSpecification interface {
// IExchangeSpecification represents the specification of an exchange
type IExchangeSpecification interface {
name() string
isAutoDelete() bool
exchangeType() ExchangeType
@ -465,7 +470,7 @@ func (c *CustomExchangeSpecification) arguments() map[string]any {
// / **** Binding ****
type BindingSpecification interface {
type IBindingSpecification interface {
sourceExchange() string
destination() string
bindingKey() string

View File

@ -5,7 +5,7 @@ import (
"sync"
)
type LifeCycleState interface {
type ILifeCycleState interface {
getState() int
}
@ -49,7 +49,7 @@ const (
closed = iota
)
func statusToString(status LifeCycleState) string {
func statusToString(status ILifeCycleState) string {
switch status.getState() {
case open:
return "open"
@ -65,8 +65,8 @@ func statusToString(status LifeCycleState) string {
}
type StateChanged struct {
From LifeCycleState
To LifeCycleState
From ILifeCycleState
To ILifeCycleState
}
func (s StateChanged) String() string {
@ -85,7 +85,7 @@ func (s StateChanged) String() string {
}
type LifeCycle struct {
state LifeCycleState
state ILifeCycleState
chStatusChanged chan *StateChanged
mutex *sync.Mutex
}
@ -97,13 +97,13 @@ func NewLifeCycle() *LifeCycle {
}
}
func (l *LifeCycle) State() LifeCycleState {
func (l *LifeCycle) State() ILifeCycleState {
l.mutex.Lock()
defer l.mutex.Unlock()
return l.state
}
func (l *LifeCycle) SetState(value LifeCycleState) {
func (l *LifeCycle) SetState(value ILifeCycleState) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.state == value {

View File

@ -8,7 +8,7 @@ import (
// MessagePropertyToAddress sets the To property of the message to the address of the target.
// The target must be a QueueAddress or an ExchangeAddress.
// Note: The field msgRef.Properties.To will be overwritten if it is already set.
func MessagePropertyToAddress(msgRef *amqp.Message, target TargetAddress) error {
func MessagePropertyToAddress(msgRef *amqp.Message, target ITargetAddress) error {
if target == nil {
return errors.New("target cannot be nil")
}
@ -33,7 +33,7 @@ func NewMessage(body []byte) *amqp.Message {
// NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target.
// The target must be a QueueAddress or an ExchangeAddress.
// This function is a helper that combines NewMessage and MessagePropertyToAddress.
func NewMessageWithAddress(body []byte, target TargetAddress) (*amqp.Message, error) {
func NewMessageWithAddress(body []byte, target ITargetAddress) (*amqp.Message, error) {
message := amqp.NewMessage(body)
err := MessagePropertyToAddress(message, target)
if err != nil {

View File

@ -8,5 +8,30 @@ import (
func generateNameWithDateTime(name string) string {
return fmt.Sprintf("%s_%s", name, strconv.FormatInt(time.Now().Unix(), 10))
}
// Helper function to create string pointers
func stringPtr(s string) *string {
return &s
}
func uint32Ptr(i uint32) *uint32 {
return &i
}
// create a static date time string for testing
func createDateTime() time.Time {
layout := time.RFC3339
value := "2006-01-02T15:04:05Z"
t, err := time.Parse(layout, value)
if err != nil {
panic(err)
}
return t
}
// convert time to pointer
func timePtr(t time.Time) *time.Time {
return &t
}