remove builders (#15)

* remove builders
---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2024-11-15 08:37:28 +01:00 committed by GitHub
parent 5fc29f4968
commit 60e006b2a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 688 additions and 506 deletions

View File

@ -1,93 +1,87 @@
package main
import (
"bufio"
"context"
"fmt"
mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"os"
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"time"
)
func main() {
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
chStatusChanged := make(chan *mq.StatusChanged, 1)
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
go func(ch chan *mq.StatusChanged) {
go func(ch chan *rabbitmq_amqp.StatusChanged) {
for statusChanged := range ch {
fmt.Printf("Status changed from %d to %d\n", statusChanged.From, statusChanged.To)
fmt.Printf("%s\n", statusChanged)
}
}(chStatusChanged)
amqpConnection := mq.NewAmqpConnection()
amqpConnection.NotifyStatusChange(chStatusChanged)
err := amqpConnection.Open(context.Background(), mq.NewConnectionSettings())
amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged)
err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings())
if err != nil {
fmt.Printf("Error opening connection: %v\n", err)
return
}
fmt.Printf("AMQP Connection opened.\n")
management := amqpConnection.Management()
queueSpec := management.Queue("getting_started_queue").
QueueType(mq.QueueType{Type: mq.Quorum}).
MaxLengthBytes(mq.CapacityGB(1))
exchangeSpec := management.Exchange("getting_started_exchange").
ExchangeType(mq.ExchangeType{Type: mq.Topic})
queueInfo, err := queueSpec.Declare(context.Background())
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
Name: "getting-started-exchange",
})
if err != nil {
fmt.Printf("Error declaring queue %s\n", err)
return
}
fmt.Printf("Queue %s created.\n", queueInfo.GetName())
exchangeInfo, err := exchangeSpec.Declare(context.Background())
if err != nil {
fmt.Printf("Error declaring exchange %s\n", err)
return
}
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName())
bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key")
err = bindingSpec.Bind(context.Background())
if err != nil {
fmt.Printf("Error binding %s\n", err)
fmt.Printf("Error declaring exchange: %v\n", err)
return
}
fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName())
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
Name: "getting-started-queue",
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
})
fmt.Println("Press any key to cleanup and exit")
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadString('\n')
err = bindingSpec.Unbind(context.Background())
if err != nil {
fmt.Printf("Error unbinding %s\n", err)
fmt.Printf("Error declaring queue: %v\n", err)
return
}
fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName())
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
SourceExchange: exchangeInfo.Name(),
DestinationQueue: queueInfo.Name(),
BindingKey: "routing-key",
})
err = exchangeSpec.Delete(context.Background())
if err != nil {
fmt.Printf("Error deleting exchange %s\n", err)
fmt.Printf("Error binding: %v\n", err)
return
}
err = queueSpec.Delete(context.Background())
err = management.Unbind(context.TODO(), bindingPath)
if err != nil {
fmt.Printf("Error unbinding: %v\n", err)
return
}
err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
if err != nil {
fmt.Printf("Error deleting exchange: %v\n", err)
return
}
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error deleting queue: %v\n", err)
return
}
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName())
err = amqpConnection.Close(context.Background())
if err != nil {
fmt.Printf("Error closing connection: %v\n", err)
return
}
fmt.Printf("AMQP Connection closed.\n")
// Wait for the status change to be printed
time.Sleep(500 * time.Millisecond)
close(chStatusChanged)
}

View File

@ -0,0 +1,125 @@
package rabbitmq_amqp
import (
"errors"
"fmt"
"net/url"
"strings"
)
type AddressBuilder struct {
queue *string
exchange *string
key *string
append *string
}
func NewAddressBuilder() *AddressBuilder {
return &AddressBuilder{}
}
func (a *AddressBuilder) Queue(queue string) *AddressBuilder {
a.queue = &queue
return a
}
func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder {
a.exchange = &exchange
return a
}
func (a *AddressBuilder) Key(key string) *AddressBuilder {
a.key = &key
return a
}
func (a *AddressBuilder) Append(append string) *AddressBuilder {
a.append = &append
return a
}
func (a *AddressBuilder) Address() (string, error) {
if a.exchange == nil && a.queue == nil {
return "", errors.New("exchange or queue must be set")
}
urlAppend := ""
if !isStringNilOrEmpty(a.append) {
urlAppend = *a.append
}
if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) {
return "", errors.New("exchange and queue cannot be set together")
}
if !isStringNilOrEmpty(a.exchange) {
if !isStringNilOrEmpty(a.key) {
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil
}
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil
}
if a.queue == nil {
return "", nil
}
if isStringNilOrEmpty(a.queue) {
return "", errors.New("queue must be set")
}
return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil
}
// encodePathSegments takes a string and returns its percent-encoded representation.
func encodePathSegments(input string) string {
var encoded strings.Builder
// Iterate over each character in the input string
for _, char := range input {
// Check if the character is an unreserved character (i.e., it doesn't need encoding)
if isUnreserved(char) {
encoded.WriteRune(char) // Append as is
} else {
// Encode character To %HH format
encoded.WriteString(fmt.Sprintf("%%%02X", char))
}
}
return encoded.String()
}
// Decode takes a percent-encoded string and returns its decoded representation.
func decode(input string) (string, error) {
// Use url.QueryUnescape which properly decodes percent-encoded strings
decoded, err := url.QueryUnescape(input)
if err != nil {
return "", err
}
return decoded, nil
}
// isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
func isUnreserved(char rune) bool {
return (char >= 'A' && char <= 'Z') ||
(char >= 'a' && char <= 'z') ||
(char >= '0' && char <= '9') ||
char == '-' || char == '.' || char == '_' || char == '~'
}
func bindingPath() string {
return "/" + bindings
}
func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string {
sourceNameEncoded := encodePathSegments(sourceName)
destinationNameEncoded := encodePathSegments(destinationName)
keyEncoded := encodePathSegments(key)
destinationType := "dste"
if toQueue {
destinationType = "dstq"
}
format := "/%s/src=%s;%s=%s;key=%s;args="
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
}

View File

@ -0,0 +1,78 @@
package rabbitmq_amqp
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Address builder test ", func() {
It("With exchange, queue and key should raise and error", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("queue").Exchange("exchange").Key("key")
_, err := addressBuilder.Address()
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
})
It("Without exchange and queue should raise and error", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
_, err := addressBuilder.Address()
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange or queue must be set"))
})
It("With exchange and key should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_exchange").Key("my_key")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
})
It("With exchange should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_exchange")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange"))
})
It("With exchange and key with names to encode should return the encoded address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_ exchange/()").Key("my_key ")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
})
It("With queue should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("my_queue>")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue%3E"))
})
It("With queue and append should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("my_queue").Append("/messages")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue/messages"))
})
})

View File

@ -20,48 +20,36 @@ func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
return &AMQPBinding{management: management}
}
func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification {
func (b *AMQPBinding) BindingKey(bindingKey string) {
b.bindingKey = bindingKey
return b
}
func (b *AMQPBinding) SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification {
b.sourceName = exchangeSpec.GetName()
func (b *AMQPBinding) SourceExchange(sourceName string) {
if len(sourceName) > 0 {
b.sourceName = sourceName
b.toQueue = false
return b
}
}
func (b *AMQPBinding) SourceExchangeName(exchangeName string) IBindingSpecification {
b.sourceName = exchangeName
func (b *AMQPBinding) DestinationExchange(destinationName string) {
if len(destinationName) > 0 {
b.destinationName = destinationName
b.toQueue = false
return b
}
}
func (b *AMQPBinding) DestinationExchange(exchangeSpec IExchangeInfo) IBindingSpecification {
b.destinationName = exchangeSpec.GetName()
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationExchangeName(exchangeName string) IBindingSpecification {
b.destinationName = exchangeName
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification {
b.destinationName = queueSpec.GetName()
b.toQueue = true
return b
}
func (b *AMQPBinding) DestinationQueueName(queueName string) IBindingSpecification {
func (b *AMQPBinding) DestinationQueue(queueName string) {
if len(queueName) > 0 {
b.destinationName = queueName
b.toQueue = true
return b
}
}
func (b *AMQPBinding) Bind(ctx context.Context) error {
// Bind creates a binding between an exchange and a queue or exchange
// with the specified binding key.
// Returns the binding path that can be used to unbind the binding.
// Given a virtual host, the binding path is unique.
func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
path := bindingPath()
kv := make(map[string]any)
kv["binding_key"] = b.bindingKey
@ -69,11 +57,14 @@ func (b *AMQPBinding) Bind(ctx context.Context) error {
kv["destination_queue"] = b.destinationName
kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
return err
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
return bindingPathWithExchangeQueueKey, err
}
func (b *AMQPBinding) Unbind(ctx context.Context) error {
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
_, err := b.management.Request(ctx, amqp.Null{}, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204})
// Unbind removes a binding between an exchange and a queue or exchange
// with the specified binding key.
// The bindingPath is the unique path that was returned when the binding was created.
func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error {
_, err := b.management.Request(ctx, amqp.Null{}, bindingPath, commandDelete, []int{responseCode204})
return err
}

View File

@ -28,26 +28,30 @@ var _ = Describe("AMQP Bindings test ", func() {
It("AMQP Bindings between Exchange and Queue Should succeed", func() {
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue should uccess"
const queueName = "Queue_AMQP Bindings between Exchange and Queue should succeed"
exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
Name: exchangeName,
})
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
Expect(exchangeInfo.Name()).To(Equal(exchangeName))
queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO())
queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))
bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key")
err = bindingSpec.Bind(context.TODO())
Expect(queueInfo.Name()).To(Equal(queueName))
bindingPath, err := management.Bind(context.TODO(), &BindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: "routing-key",
})
Expect(err).To(BeNil())
err = bindingSpec.Unbind(context.TODO())
err = management.Unbind(context.TODO(), bindingPath)
Expect(err).To(BeNil())
err = exchangeSpec.Delete(context.TODO())
err = management.DeleteExchange(context.TODO(), exchangeName)
Expect(err).To(BeNil())
err = queueSpec.Delete(context.TODO())
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
})

View File

@ -2,131 +2,17 @@ package rabbitmq_amqp
import (
"context"
"crypto/tls"
"fmt"
"github.com/Azure/go-amqp"
)
type ConnectionSettings struct {
host string
port int
user string
password string
virtualHost string
scheme string
containerId string
useSsl bool
tlsConfig *tls.Config
saslMechanism TSaslMechanism
}
func (c *ConnectionSettings) GetSaslMechanism() TSaslMechanism {
return c.saslMechanism
}
func (c *ConnectionSettings) SaslMechanism(mechanism SaslMechanism) IConnectionSettings {
c.saslMechanism = mechanism.Type
return c
}
func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings {
c.tlsConfig = config
return c
}
func (c *ConnectionSettings) GetTlsConfig() *tls.Config {
return c.tlsConfig
}
func (c *ConnectionSettings) Port(port int) IConnectionSettings {
c.port = port
return c
}
func (c *ConnectionSettings) User(userName string) IConnectionSettings {
c.user = userName
return c
}
func (c *ConnectionSettings) Password(password string) IConnectionSettings {
c.password = password
return c
}
func (c *ConnectionSettings) VirtualHost(virtualHost string) IConnectionSettings {
c.virtualHost = virtualHost
return c
}
func (c *ConnectionSettings) ContainerId(containerId string) IConnectionSettings {
c.containerId = containerId
return c
}
func (c *ConnectionSettings) GetHost() string {
return c.host
}
func (c *ConnectionSettings) Host(hostName string) IConnectionSettings {
c.host = hostName
return c
}
func (c *ConnectionSettings) GetPort() int {
return c.port
}
func (c *ConnectionSettings) GetUser() string {
return c.user
}
func (c *ConnectionSettings) GetPassword() string {
return c.password
}
func (c *ConnectionSettings) GetVirtualHost() string {
return c.virtualHost
}
func (c *ConnectionSettings) GetScheme() string {
return c.scheme
}
func (c *ConnectionSettings) GetContainerId() string {
return c.containerId
}
func (c *ConnectionSettings) UseSsl(value bool) IConnectionSettings {
c.useSsl = value
if value {
c.scheme = "amqps"
} else {
c.scheme = "amqp"
}
return c
}
func (c *ConnectionSettings) IsSsl() bool {
return c.useSsl
}
func (c *ConnectionSettings) BuildAddress() string {
return c.scheme + "://" + c.host + ":" + fmt.Sprint(c.port)
}
func NewConnectionSettings() IConnectionSettings {
return &ConnectionSettings{
host: "localhost",
port: 5672,
user: "guest",
password: "guest",
virtualHost: "/",
scheme: "amqp",
containerId: "amqp-go-client",
useSsl: false,
tlsConfig: nil,
}
}
//func (c *ConnectionSettings) UseSsl(value bool) {
// c.UseSsl = value
// if value {
// c.Scheme = "amqps"
// } else {
// c.Scheme = "amqp"
// }
//}
type AmqpConnection struct {
Connection *amqp.Conn
@ -134,10 +20,15 @@ type AmqpConnection struct {
lifeCycle *LifeCycle
}
// Management returns the management interface for the connection.
// See IManagement interface.
func (a *AmqpConnection) Management() IManagement {
return a.management
}
// NewAmqpConnection creates a new AmqpConnection
// with a new AmqpManagement and a new LifeCycle.
// Returns a pointer to the new AmqpConnection
func NewAmqpConnection() IConnection {
return &AmqpConnection{
management: NewAmqpManagement(),
@ -145,20 +36,36 @@ func NewAmqpConnection() IConnection {
}
}
func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error {
// NewAmqpConnectionNotifyStatusChanged creates a new AmqpConnection
// with a new AmqpManagement and a new LifeCycle
// and sets the channel for status changes.
// Returns a pointer to the new AmqpConnection
func NewAmqpConnectionNotifyStatusChanged(channel chan *StatusChanged) IConnection {
lifeCycle := NewLifeCycle()
lifeCycle.chStatusChanged = channel
return &AmqpConnection{
management: NewAmqpManagement(),
lifeCycle: lifeCycle,
}
}
// Open opens a connection to the AMQP 1.0 server.
// using the provided connectionSettings and the AMQPLite library.
// Setups the connection and the management interface.
func (a *AmqpConnection) Open(ctx context.Context, connectionSettings *ConnectionSettings) error {
sASLType := amqp.SASLTypeAnonymous()
switch connectionSettings.GetSaslMechanism() {
switch connectionSettings.SaslMechanism {
case Plain:
sASLType = amqp.SASLTypePlain(connectionSettings.GetUser(), connectionSettings.GetPassword())
sASLType = amqp.SASLTypePlain(connectionSettings.User, connectionSettings.Password)
case External:
sASLType = amqp.SASLTypeExternal("")
}
conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{
ContainerID: connectionSettings.GetContainerId(),
ContainerID: connectionSettings.ContainerId,
SASLType: sASLType,
HostName: connectionSettings.GetVirtualHost(),
TLSConfig: connectionSettings.GetTlsConfig(),
HostName: connectionSettings.VirtualHost,
TLSConfig: connectionSettings.TlsConfig,
})
if err != nil {
return err
@ -188,6 +95,6 @@ func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) {
a.lifeCycle.chStatusChanged = channel
}
func (a *AmqpConnection) GetStatus() int {
func (a *AmqpConnection) Status() int {
return a.lifeCycle.Status()
}

View File

@ -15,7 +15,7 @@ var _ = Describe("AMQP Connection Test", func() {
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous})
connectionSettings.SaslMechanism = Anonymous
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
@ -32,7 +32,7 @@ var _ = Describe("AMQP Connection Test", func() {
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.SaslMechanism(SaslMechanism{Type: Plain})
connectionSettings.SaslMechanism = Plain
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())
@ -40,28 +40,32 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(err).To(BeNil())
})
It("AMQP Connection should fail due of wrong port", func() {
It("AMQP Connection should fail due of wrong Port", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
connectionSettings := &ConnectionSettings{
Host: "localhost",
Port: 1234,
}
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.Host("localhost").Port(1234)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fail due of wrong host", func() {
It("AMQP Connection should fail due of wrong Host", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
connectionSettings := &ConnectionSettings{
Host: "wronghost",
Port: 5672,
}
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.Host("wronghost").Port(5672)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).NotTo(BeNil())

View File

@ -13,7 +13,7 @@ func newAmqpExchangeInfo(name string) IExchangeInfo {
return &AmqpExchangeInfo{name: name}
}
func (a *AmqpExchangeInfo) GetName() string {
func (a *AmqpExchangeInfo) Name() string {
return a.name
}
@ -34,22 +34,24 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
}
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
path := exchangePath(e.name)
path, err := NewAddressBuilder().Exchange(e.name).Address()
if err != nil {
return nil, err
}
kv := make(map[string]any)
kv["auto_delete"] = e.isAutoDelete
kv["durable"] = true
kv["type"] = e.exchangeType.String()
kv["arguments"] = e.arguments
_, err := e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409})
_, err = e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409})
if err != nil {
return nil, err
}
return newAmqpExchangeInfo(e.name), nil
}
func (e *AmqpExchange) AutoDelete(isAutoDelete bool) IExchangeSpecification {
func (e *AmqpExchange) AutoDelete(isAutoDelete bool) {
e.isAutoDelete = isAutoDelete
return e
}
func (e *AmqpExchange) IsAutoDelete() bool {
@ -57,20 +59,24 @@ func (e *AmqpExchange) IsAutoDelete() bool {
}
func (e *AmqpExchange) Delete(ctx context.Context) error {
path := exchangePath(e.name)
_, err := e.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode204})
path, err := NewAddressBuilder().Exchange(e.name).Address()
if err != nil {
return err
}
_, err = e.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode204})
return err
}
func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) IExchangeSpecification {
func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) {
if len(exchangeType.Type) > 0 {
e.exchangeType = exchangeType
return e
}
}
func (e *AmqpExchange) GetExchangeType() TExchangeType {
return e.exchangeType.Type
}
func (e *AmqpExchange) GetName() string {
func (e *AmqpExchange) Name() string {
return e.name
}

View File

@ -28,34 +28,40 @@ var _ = Describe("AMQP Exchange test ", func() {
It("AMQP Exchange Declare with Default and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare and Delete with Default should succeed"
exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
Name: exchangeName,
})
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
err = exchangeSpec.Delete(context.TODO())
Expect(exchangeInfo.Name()).To(Equal(exchangeName))
err = management.DeleteExchange(context.TODO(), exchangeName)
Expect(err).To(BeNil())
})
It("AMQP Exchange Declare with Topic and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with Topic and Delete should succeed"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic})
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
Name: exchangeName,
ExchangeType: ExchangeType{Topic},
})
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
err = exchangeSpec.Delete(context.TODO())
Expect(exchangeInfo.Name()).To(Equal(exchangeName))
err = management.DeleteExchange(context.TODO(), exchangeName)
Expect(err).To(BeNil())
})
It("AMQP Exchange Declare with FanOut and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with FanOut and Delete should succeed"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut})
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
//exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut})
exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
Name: exchangeName,
ExchangeType: ExchangeType{FanOut},
})
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
err = exchangeSpec.Delete(context.TODO())
Expect(exchangeInfo.Name()).To(Equal(exchangeName))
err = management.DeleteExchange(context.TODO(), exchangeName)
Expect(err).To(BeNil())
})
})

View File

@ -22,14 +22,6 @@ type AmqpManagement struct {
cancel context.CancelFunc
}
func (a *AmqpManagement) Binding() IBindingSpecification {
return newAMQPBinding(a)
}
func (a *AmqpManagement) Exchange(exchangeName string) IExchangeSpecification {
return newAmqpExchange(a, exchangeName)
}
func NewAmqpManagement() *AmqpManagement {
return &AmqpManagement{
lifeCycle: NewLifeCycle(),
@ -201,12 +193,65 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
return make(map[string]any), nil
}
func (a *AmqpManagement) Queue(queueName string) IQueueSpecification {
return newAmqpQueue(a, queueName)
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error) {
var amqpQueue *AmqpQueue
if specification == nil || len(specification.Name) <= 0 {
// If the specification is nil or the name is empty, then we create a new queue
// with a random name with generateNameWithDefaultPrefix()
amqpQueue = newAmqpQueue(a, "")
} else {
amqpQueue = newAmqpQueue(a, specification.Name)
amqpQueue.AutoDelete(specification.IsAutoDelete)
amqpQueue.Exclusive(specification.IsExclusive)
amqpQueue.MaxLengthBytes(specification.MaxLengthBytes)
amqpQueue.DeadLetterExchange(specification.DeadLetterExchange)
amqpQueue.DeadLetterRoutingKey(specification.DeadLetterRoutingKey)
amqpQueue.QueueType(specification.QueueType)
}
return amqpQueue.Declare(ctx)
}
func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error {
q := newAmqpQueue(a, name)
return q.Delete(ctx)
}
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error) {
if exchangeSpecification == nil {
return nil, fmt.Errorf("exchangeSpecification is nil")
}
exchange := newAmqpExchange(a, exchangeSpecification.Name)
exchange.AutoDelete(exchangeSpecification.IsAutoDelete)
exchange.ExchangeType(exchangeSpecification.ExchangeType)
return exchange.Declare(ctx)
}
func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error {
e := newAmqpExchange(a, name)
return e.Delete(ctx)
}
func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification *BindingSpecification) (string, error) {
bind := newAMQPBinding(a)
bind.SourceExchange(bindingSpecification.SourceExchange)
bind.DestinationQueue(bindingSpecification.DestinationQueue)
bind.DestinationExchange(bindingSpecification.DestinationExchange)
bind.BindingKey(bindingSpecification.BindingKey)
return bind.Bind(ctx)
}
func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error {
bind := newAMQPBinding(a)
return bind.Unbind(ctx, bindingPath)
}
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) {
path := queuePath(queueName)
path, err := NewAddressBuilder().Queue(queueName).Address()
if err != nil {
return nil, err
}
result, err := a.Request(ctx, amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
if err != nil {
return nil, err
@ -214,14 +259,15 @@ func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueu
return newAmqpQueueInfo(result), nil
}
func (a *AmqpManagement) QueueClientName() IQueueSpecification {
return newAmqpQueue(a, "")
func (a *AmqpManagement) PurgeQueue(ctx context.Context, queueName string) (int, error) {
purge := newAmqpQueue(a, queueName)
return purge.Purge(ctx)
}
func (a *AmqpManagement) NotifyStatusChange(channel chan *StatusChanged) {
a.lifeCycle.chStatusChanged = channel
}
func (a *AmqpManagement) GetStatus() int {
func (a *AmqpManagement) Status() int {
return a.lifeCycle.Status()
}

View File

@ -20,14 +20,13 @@ var _ = Describe("Management tests", func() {
cancel()
err = amqpConnection.Management().Open(ctx, amqpConnection)
Expect(err).NotTo(BeNil())
amqpConnection.Close(context.Background())
Expect(amqpConnection.Close(context.Background())).To(BeNil())
})
It("AMQP Management should receive events", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
ch := make(chan *StatusChanged, 1)
amqpConnection.Management().NotifyStatusChange(ch)
amqpConnection := NewAmqpConnectionNotifyStatusChanged(ch)
Expect(amqpConnection).NotTo(BeNil())
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
Expect(err).To(BeNil())
recv := <-ch
@ -42,7 +41,7 @@ var _ = Describe("Management tests", func() {
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
amqpConnection.Close(context.Background())
Expect(amqpConnection.Close(context.Background())).To(BeNil())
})
It("Request", func() {
@ -68,7 +67,7 @@ var _ = Describe("Management tests", func() {
Expect(err).To(BeNil())
Expect(result).NotTo(BeNil())
Expect(management.Close(context.Background())).To(BeNil())
amqpConnection.Close(context.Background())
Expect(amqpConnection.Close(context.Background())).To(BeNil())
})
It("GET on non-existing queue returns ErrDoesNotExist", func() {

View File

@ -12,17 +12,17 @@ type AmqpQueueInfo struct {
isAutoDelete bool
isExclusive bool
leader string
replicas []string
members []string
arguments map[string]any
queueType TQueueType
}
func (a *AmqpQueueInfo) GetLeader() string {
func (a *AmqpQueueInfo) Leader() string {
return a.leader
}
func (a *AmqpQueueInfo) GetReplicas() []string {
return a.replicas
func (a *AmqpQueueInfo) Members() []string {
return a.members
}
func newAmqpQueueInfo(response map[string]any) IQueueInfo {
@ -33,7 +33,7 @@ func newAmqpQueueInfo(response map[string]any) IQueueInfo {
isExclusive: response["exclusive"].(bool),
queueType: TQueueType(response["type"].(string)),
leader: response["leader"].(string),
replicas: response["replicas"].([]string),
members: response["replicas"].([]string),
arguments: response["arguments"].(map[string]any),
}
}
@ -54,11 +54,11 @@ func (a *AmqpQueueInfo) Type() TQueueType {
return a.queueType
}
func (a *AmqpQueueInfo) GetName() string {
func (a *AmqpQueueInfo) Name() string {
return a.name
}
func (a *AmqpQueueInfo) GetArguments() map[string]any {
func (a *AmqpQueueInfo) Arguments() map[string]any {
return a.arguments
}
@ -70,24 +70,28 @@ type AmqpQueue struct {
name string
}
func (a *AmqpQueue) DeadLetterExchange(dlx string) IQueueSpecification {
func (a *AmqpQueue) DeadLetterExchange(dlx string) {
if len(dlx) != 0 {
a.arguments["x-dead-letter-exchange"] = dlx
return a
}
}
func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) IQueueSpecification {
func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) {
if len(dlrk) != 0 {
a.arguments["x-dead-letter-routing-key"] = dlrk
return a
}
}
func (a *AmqpQueue) MaxLengthBytes(length int64) IQueueSpecification {
func (a *AmqpQueue) MaxLengthBytes(length int64) {
if length != 0 {
a.arguments["max-length-bytes"] = length
return a
}
}
func (a *AmqpQueue) QueueType(queueType QueueType) IQueueSpecification {
func (a *AmqpQueue) QueueType(queueType QueueType) {
if len(queueType.String()) != 0 {
a.arguments["x-queue-type"] = queueType.String()
return a
}
}
func (a *AmqpQueue) GetQueueType() TQueueType {
@ -97,25 +101,23 @@ func (a *AmqpQueue) GetQueueType() TQueueType {
return TQueueType(a.arguments["x-queue-type"].(string))
}
func (a *AmqpQueue) Exclusive(isExclusive bool) IQueueSpecification {
func (a *AmqpQueue) Exclusive(isExclusive bool) {
a.isExclusive = isExclusive
return a
}
func (a *AmqpQueue) IsExclusive() bool {
return a.isExclusive
}
func (a *AmqpQueue) AutoDelete(isAutoDelete bool) IQueueSpecification {
func (a *AmqpQueue) AutoDelete(isAutoDelete bool) {
a.isAutoDelete = isAutoDelete
return a
}
func (a *AmqpQueue) IsAutoDelete() bool {
return a.isAutoDelete
}
func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecification {
func newAmqpQueue(management *AmqpManagement, queueName string) *AmqpQueue {
return &AmqpQueue{management: management,
name: queueName,
arguments: make(map[string]any)}
@ -135,7 +137,8 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
if Quorum == a.GetQueueType() ||
Stream == a.GetQueueType() {
// mandatory arguments for quorum queues and streams
a.Exclusive(false).AutoDelete(false)
a.Exclusive(false)
a.AutoDelete(false)
}
if err := a.validate(); err != nil {
@ -146,7 +149,10 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
a.name = generateNameWithDefaultPrefix()
}
path := queuePath(a.name)
path, err := NewAddressBuilder().Queue(a.name).Address()
if err != nil {
return nil, err
}
kv := make(map[string]any)
kv["durable"] = true
kv["auto_delete"] = a.isAutoDelete
@ -160,22 +166,24 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
}
func (a *AmqpQueue) Delete(ctx context.Context) error {
path := queuePath(a.name)
_, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200})
path, err := NewAddressBuilder().Queue(a.name).Address()
if err != nil {
return err
}
_, err = a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200})
return err
}
func (a *AmqpQueue) Purge(ctx context.Context) (int, error) {
path := queuePurgePath(a.name)
path, err := NewAddressBuilder().Queue(a.name).Append("/messages").Address()
if err != nil {
return 0, err
}
response, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200})
return int(response["message_count"].(uint64)), err
}
func (a *AmqpQueue) Name(queueName string) IQueueSpecification {
func (a *AmqpQueue) Name(queueName string) {
a.name = queueName
return a
}
func (a *AmqpQueue) GetName() string {
return a.name
}

View File

@ -31,11 +31,12 @@ var _ = Describe("AMQP Queue test ", func() {
It("AMQP Queue Declare With Response and Get/Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Response and Delete should succeed"
queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO())
queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.Name()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
@ -45,137 +46,160 @@ var _ = Describe("AMQP Queue test ", func() {
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))
err = queueSpec.Delete(context.TODO())
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
It("AMQP Queue Declare With Parameters and Get/Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Parameters and Delete should succeed"
queueSpec := management.Queue(queueName).Exclusive(true).
AutoDelete(true).
QueueType(QueueType{Classic}).
MaxLengthBytes(CapacityGB(1)).
DeadLetterExchange("dead-letter-exchange").
DeadLetterRoutingKey("dead-letter-routing-key")
queueInfo, err := queueSpec.Declare(context.TODO())
queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
IsAutoDelete: true,
IsExclusive: true,
QueueType: QueueType{Classic},
MaxLengthBytes: CapacityGB(1),
DeadLetterExchange: "dead-letter-exchange",
DeadLetterRoutingKey: "dead-letter-routing-key",
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.Name()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeTrue())
Expect(queueInfo.IsExclusive()).To(BeTrue())
Expect(queueInfo.Type()).To(Equal(Classic))
Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit"))
Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0))
Expect(queueInfo.Leader()).To(ContainSubstring("rabbit"))
Expect(len(queueInfo.Members())).To(BeNumerically(">", 0))
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-exchange", "dead-letter-exchange"))
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key"))
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000)))
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-dead-letter-exchange", "dead-letter-exchange"))
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key"))
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000)))
// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))
err = queueSpec.Delete(context.TODO())
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
It("AMQP Declare Quorum Queue and Get/Delete should succeed", func() {
const queueName = "AMQP Declare Quorum Queue and Delete should succeed"
// Quorum queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
queueSpec := management.Queue(queueName).
Exclusive(true).
AutoDelete(true).QueueType(QueueType{Quorum})
queueInfo, err := queueSpec.Declare(context.TODO())
queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
IsAutoDelete: true,
IsExclusive: true,
QueueType: QueueType{Quorum},
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.Name()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Quorum))
// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))
err = queueSpec.Delete(context.TODO())
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
It("AMQP Declare Stream Queue and Get/Delete should succeed", func() {
const queueName = "AMQP Declare Stream Queue and Delete should succeed"
// Stream queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
queueSpec := management.Queue(queueName).
Exclusive(true).
AutoDelete(true).QueueType(QueueType{Stream})
queueInfo, err := queueSpec.Declare(context.TODO())
queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
IsAutoDelete: true,
IsExclusive: true,
QueueType: QueueType{Stream},
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.Name()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Stream))
// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))
err = queueSpec.Delete(context.TODO())
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
It("AMQP Declare Queue with invalid type should fail", func() {
const queueName = "AMQP Declare Queue with invalid type should fail"
queueSpec := management.Queue(queueName).
QueueType(QueueType{Type: "invalid"})
_, err := queueSpec.Declare(context.TODO())
_, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
QueueType: QueueType{Type: "invalid"},
})
Expect(err).NotTo(BeNil())
})
It("AMQP Declare Queue should fail with Precondition fail", func() {
// The first queue is declared as Classic and it should succeed
// The second queue is declared as Quorum and it should fail since it is already declared as Classic
// The first queue is declared as Classic, and it should succeed
// The second queue is declared as Quorum, and it should fail since it is already declared as Classic
const queueName = "AMQP Declare Queue should fail with Precondition fail"
queueSpec := management.Queue(queueName).QueueType(QueueType{Classic})
_, err := queueSpec.Declare(context.TODO())
_, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
QueueType: QueueType{Classic},
})
Expect(err).To(BeNil())
queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum})
_, err = queueSpecFail.Declare(context.TODO())
_, err = management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
QueueType: QueueType{Quorum},
})
Expect(err).NotTo(BeNil())
Expect(err).To(Equal(ErrPreconditionFailed))
err = queueSpec.Delete(context.TODO())
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
It("AMQP Declare Queue should fail during validation", func() {
const queueName = "AMQP Declare Queue should fail during validation"
queueSpec := management.Queue(queueName).MaxLengthBytes(-1)
_, err := queueSpec.Declare(context.TODO())
_, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
MaxLengthBytes: -1,
})
Expect(err).NotTo(BeNil())
Expect(err).To(HaveOccurred())
})
It("AMQP Declare Queue should create client name queue", func() {
queueSpec := management.QueueClientName()
queueInfo, err := queueSpec.Declare(context.TODO())
queueInfo, err := management.DeclareQueue(context.TODO(), nil)
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(ContainSubstring("client.gen-"))
err = queueSpec.Delete(context.TODO())
Expect(queueInfo.Name()).To(ContainSubstring("client.gen-"))
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
Expect(err).To(BeNil())
})
It("AMQP Purge Queue should succeed and return the number of messages purged", func() {
const queueName = "AMQP Purge Queue should succeed and return the number of messages purged"
queueSpec := management.Queue(queueName)
_, err := queueSpec.Declare(context.TODO())
queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
})
Expect(err).To(BeNil())
publishMessages(queueName, 10)
purged, err := queueSpec.Purge(context.TODO())
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
Expect(err).To(BeNil())
Expect(purged).To(Equal(10))
})
@ -199,7 +223,13 @@ func publishMessages(queueName string, count int) {
if err != nil {
Fail(err.Error())
}
sender, err := session.NewSender(context.TODO(), queuePath(queueName), nil)
address, err := NewAddressBuilder().Queue(queueName).Address()
if err != nil {
Fail(err.Error())
}
sender, err := session.NewSender(context.TODO(), address, nil)
if err != nil {
Fail(err.Error())
}

View File

@ -4,10 +4,8 @@ import (
"crypto/md5"
"encoding/base64"
"fmt"
"net/url"
"strings"
"github.com/google/uuid"
"strings"
)
const (
@ -29,73 +27,6 @@ const (
bindings = "bindings"
)
// encodePathSegments takes a string and returns its percent-encoded representation.
func encodePathSegments(input string) string {
var encoded strings.Builder
// Iterate over each character in the input string
for _, char := range input {
// Check if the character is an unreserved character (i.e., it doesn't need encoding)
if isUnreserved(char) {
encoded.WriteRune(char) // Append as is
} else {
// Encode character To %HH format
encoded.WriteString(fmt.Sprintf("%%%02X", char))
}
}
return encoded.String()
}
// Decode takes a percent-encoded string and returns its decoded representation.
func decode(input string) (string, error) {
// Use url.QueryUnescape which properly decodes percent-encoded strings
decoded, err := url.QueryUnescape(input)
if err != nil {
return "", err
}
return decoded, nil
}
// isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
func isUnreserved(char rune) bool {
return (char >= 'A' && char <= 'Z') ||
(char >= 'a' && char <= 'z') ||
(char >= '0' && char <= '9') ||
char == '-' || char == '.' || char == '_' || char == '~'
}
func queuePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName)
}
func queuePurgePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName) + "/messages"
}
func exchangePath(exchangeName string) string {
return "/" + exchanges + "/" + encodePathSegments(exchangeName)
}
func bindingPath() string {
return "/" + bindings
}
func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string {
sourceNameEncoded := encodePathSegments(sourceName)
destinationNameEncoded := encodePathSegments(destinationName)
keyEncoded := encodePathSegments(key)
destinationType := "dste"
if toQueue {
destinationType = "dstq"
}
format := "/%s/src=%s;%s=%s;key=%s;args="
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
}
func validatePositive(label string, value int64) error {
if value < 0 {
return fmt.Errorf("value for %s must be positive, got %d", label, value)
@ -119,3 +50,8 @@ func generateName(prefix string) string {
result = strings.ReplaceAll(result, "=", "")
return prefix + result
}
func isStringNilOrEmpty(str *string) bool {
return str == nil || len(*str) == 0
}

View File

@ -3,6 +3,7 @@ package rabbitmq_amqp
import (
"context"
"crypto/tls"
"fmt"
)
type TSaslMechanism string
@ -17,33 +18,52 @@ type SaslMechanism struct {
Type TSaslMechanism
}
type IConnectionSettings interface {
GetHost() string
Host(hostName string) IConnectionSettings
GetPort() int
Port(port int) IConnectionSettings
GetUser() string
User(userName string) IConnectionSettings
GetPassword() string
Password(password string) IConnectionSettings
GetVirtualHost() string
VirtualHost(virtualHost string) IConnectionSettings
GetScheme() string
GetContainerId() string
ContainerId(containerId string) IConnectionSettings
UseSsl(value bool) IConnectionSettings
IsSsl() bool
BuildAddress() string
TlsConfig(config *tls.Config) IConnectionSettings
GetTlsConfig() *tls.Config
GetSaslMechanism() TSaslMechanism
SaslMechanism(mechanism SaslMechanism) IConnectionSettings
type ConnectionSettings struct {
Host string
Port int
User string
Password string
VirtualHost string
Scheme string
ContainerId string
UseSsl bool
TlsConfig *tls.Config
SaslMechanism TSaslMechanism
}
func (c *ConnectionSettings) BuildAddress() string {
return c.Scheme + "://" + c.Host + ":" + fmt.Sprint(c.Port)
}
// NewConnectionSettings creates a new ConnectionSettings struct with default values.
func NewConnectionSettings() *ConnectionSettings {
return &ConnectionSettings{
Host: "localhost",
Port: 5672,
User: "guest",
Password: "guest",
VirtualHost: "/",
Scheme: "amqp",
ContainerId: "amqp-go-client",
UseSsl: false,
TlsConfig: nil,
}
}
type IConnection interface {
Open(ctx context.Context, connectionSettings IConnectionSettings) error
// Open opens a connection to the AMQP 1.0 server.
Open(ctx context.Context, connectionSettings *ConnectionSettings) error
// Close closes the connection to the AMQP 1.0 server.
Close(ctx context.Context) error
// Management returns the management interface for the connection.
Management() IManagement
// NotifyStatusChange registers a channel to receive status change notifications.
// The channel will receive a StatusChanged struct whenever the status of the connection changes.
NotifyStatusChange(channel chan *StatusChanged)
GetStatus() int
// Status returns the current status of the connection.
// See LifeCycle struct for more information.
Status() int
}

View File

@ -46,7 +46,7 @@ func CapacityFrom(value string) (int64, error) {
match, err := regexp.Compile("^((kb|mb|gb|tb))")
if err != nil {
return 0,
fmt.Errorf("Capacity, invalid unit size format:%s", value)
fmt.Errorf("capacity, invalid unit size format:%s", value)
}
foundUnitSize := strings.ToLower(value[len(value)-2:])
@ -55,7 +55,7 @@ func CapacityFrom(value string) (int64, error) {
size, err := strconv.Atoi(value[:len(value)-2])
if err != nil {
return 0, fmt.Errorf("Capacity, Invalid number format: %s", value)
return 0, fmt.Errorf("capacity, Invalid number format: %s", value)
}
switch foundUnitSize {
case UnitKb:
@ -72,5 +72,5 @@ func CapacityFrom(value string) (int64, error) {
}
}
return 0, fmt.Errorf("Capacity, Invalid unit size format: %s", value)
return 0, fmt.Errorf("capacity, Invalid unit size format: %s", value)
}

View File

@ -1,9 +1,5 @@
package rabbitmq_amqp
import (
"context"
)
type TQueueType string
const (
@ -20,38 +16,29 @@ func (e QueueType) String() string {
return string(e.Type)
}
type IEntityInfoSpecification[T any] interface {
Declare(ctx context.Context) (T, error)
Delete(ctx context.Context) error
}
type IQueueSpecification interface {
GetName() string
Exclusive(isExclusive bool) IQueueSpecification
IsExclusive() bool
AutoDelete(isAutoDelete bool) IQueueSpecification
IsAutoDelete() bool
IEntityInfoSpecification[IQueueInfo]
QueueType(queueType QueueType) IQueueSpecification
GetQueueType() TQueueType
MaxLengthBytes(length int64) IQueueSpecification
DeadLetterExchange(dlx string) IQueueSpecification
DeadLetterRoutingKey(dlrk string) IQueueSpecification
Purge(ctx context.Context) (int, error)
// QueueSpecification represents the specification of a queue
type QueueSpecification struct {
Name string
IsAutoDelete bool
IsExclusive bool
QueueType QueueType
MaxLengthBytes int64
DeadLetterExchange string
DeadLetterRoutingKey string
}
// IQueueInfo represents the information of a queue
// It is returned by the Declare method of IQueueSpecification
// The information come from the server
type IQueueInfo interface {
GetName() string
Name() string
IsDurable() bool
IsAutoDelete() bool
IsExclusive() bool
Type() TQueueType
GetLeader() string
GetReplicas() []string
GetArguments() map[string]any
Leader() string
Members() []string
Arguments() map[string]any
}
type TExchangeType string
@ -74,24 +61,18 @@ func (e ExchangeType) String() string {
// It is empty at the moment because the server does not return any information
// We leave it here for future use. In case the server returns information about an exchange
type IExchangeInfo interface {
GetName() string
Name() string
}
type IExchangeSpecification interface {
GetName() string
AutoDelete(isAutoDelete bool) IExchangeSpecification
IsAutoDelete() bool
IEntityInfoSpecification[IExchangeInfo]
ExchangeType(exchangeType ExchangeType) IExchangeSpecification
GetExchangeType() TExchangeType
type ExchangeSpecification struct {
Name string
IsAutoDelete bool
ExchangeType ExchangeType
}
type IBindingSpecification interface {
SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification
SourceExchangeName(exchangeName string) IBindingSpecification
DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification
DestinationQueueName(queueName string) IBindingSpecification
Key(bindingKey string) IBindingSpecification
Bind(ctx context.Context) error
Unbind(ctx context.Context) error
type BindingSpecification struct {
SourceExchange string
DestinationQueue string
DestinationExchange string
BindingKey string
}

View File

@ -1,6 +1,9 @@
package rabbitmq_amqp
import "sync"
import (
"fmt"
"sync"
)
const (
Open = iota
@ -9,11 +12,30 @@ const (
Closed = iota
)
func statusToString(status int) string {
switch status {
case Open:
return "Open"
case Reconnecting:
return "Reconnecting"
case Closing:
return "Closing"
case Closed:
return "Closed"
}
return "Unknown"
}
type StatusChanged struct {
From int
To int
}
func (s StatusChanged) String() string {
return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To))
}
type LifeCycle struct {
status int
chStatusChanged chan *StatusChanged

View File

@ -5,15 +5,40 @@ import (
)
type IManagement interface {
// Open setups the sender and receiver links to the management interface.
Open(ctx context.Context, connection IConnection) error
// Close closes the sender and receiver links to the management interface.
Close(ctx context.Context) error
Queue(queueName string) IQueueSpecification
// DeclareQueue creates a queue with the specified specification.
DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error)
// DeleteQueue deletes the queue with the specified name.
DeleteQueue(ctx context.Context, name string) error
// DeclareExchange creates an exchange with the specified specification.
DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error)
// DeleteExchange deletes the exchange with the specified name.
DeleteExchange(ctx context.Context, name string) error
//Bind creates a binding between an exchange and a queue or exchange
Bind(ctx context.Context, bindingSpecification *BindingSpecification) (string, error)
// Unbind removes a binding between an exchange and a queue or exchange given the binding path.
Unbind(ctx context.Context, bindingPath string) error
// PurgeQueue removes all messages from the queue. Returns the number of messages purged.
PurgeQueue(ctx context.Context, queueName string) (int, error)
// QueueInfo returns information about the queue with the specified name.
QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error)
Exchange(exchangeName string) IExchangeSpecification
Binding() IBindingSpecification
QueueClientName() IQueueSpecification
GetStatus() int
// Status returns the current status of the management interface.
// See LifeCycle struct for more information.
Status() int
// NotifyStatusChange registers a channel to receive status change notifications.
// The channel will receive a StatusChanged struct whenever the status of the management interface changes.
NotifyStatusChange(channel chan *StatusChanged)
//Request sends a request to the management interface with the specified body, path, and method.
//Returns the response body as a map[string]any.
//It usually is not necessary to call this method directly. Leave it public for custom use cases.
// The calls above are the recommended way to interact with the management interface.
Request(ctx context.Context, body any, path string, method string,
expectedResponseCodes []int) (map[string]any, error)
}