Add multi uris support (#17)

* Multi uris configuration
*  remove the Interfaces
* Add outcome delivery
* refactor
* refactor state
---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2025-01-16 15:26:12 +01:00 committed by GitHub
parent 1a6679a201
commit 35b8893b93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 387 additions and 271 deletions

View File

@ -9,29 +9,28 @@ import (
)
func main() {
exchangeName := "getting-started-exchange"
queueName := "getting-started-queue"
routingKey := "routing-key"
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client")
/// Create a channel to receive status change notifications
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
go func(ch chan *rabbitmq_amqp.StatusChanged) {
/// Create a channel to receive state change notifications
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
go func(ch chan *rabbitmq_amqp.StateChanged) {
for statusChanged := range ch {
fmt.Printf("%s\n", statusChanged)
rabbitmq_amqp.Info("[Connection]", "Status changed", statusChanged)
}
}(chStatusChanged)
}(stateChanged)
// Open a connection to the AMQP 1.0 server
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil)
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, nil)
if err != nil {
fmt.Printf("Error opening connection: %v\n", err)
rabbitmq_amqp.Error("Error opening connection", err)
return
}
// Register the channel to receive status change notifications
amqpConnection.NotifyStatusChange(chStatusChanged)
amqpConnection.NotifyStatusChange(stateChanged)
fmt.Printf("AMQP Connection opened.\n")
// Create the management interface for the connection
@ -41,7 +40,7 @@ func main() {
Name: exchangeName,
})
if err != nil {
fmt.Printf("Error declaring exchange: %v\n", err)
rabbitmq_amqp.Error("Error declaring exchange", err)
return
}
@ -52,7 +51,7 @@ func main() {
})
if err != nil {
fmt.Printf("Error declaring queue: %v\n", err)
rabbitmq_amqp.Error("Error declaring queue", err)
return
}
@ -64,7 +63,7 @@ func main() {
})
if err != nil {
fmt.Printf("Error binding: %v\n", err)
rabbitmq_amqp.Error("Error binding", err)
return
}
@ -72,16 +71,31 @@ func main() {
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
if err != nil {
fmt.Printf("Error creating publisher: %v\n", err)
rabbitmq_amqp.Error("Error creating publisher", err)
return
}
// Publish a message to the exchange
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
if err != nil {
fmt.Printf("Error publishing message: %v\n", err)
rabbitmq_amqp.Error("Error publishing message", err)
return
}
switch publishResult.Outcome {
case &amqp.StateAccepted{}:
rabbitmq_amqp.Info("Message accepted")
case &amqp.StateReleased{}:
rabbitmq_amqp.Warn("Message was not routed")
case &amqp.StateRejected{}:
rabbitmq_amqp.Warn("Message rejected")
stateType := publishResult.Outcome.(*amqp.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error)
}
default:
// these status are not supported
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
}
println("press any key to close the connection")
@ -132,5 +146,5 @@ func main() {
// Wait for the status change to be printed
time.Sleep(500 * time.Millisecond)
close(chStatusChanged)
close(stateChangeds)
}

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client
go 1.22.0
require (
github.com/Azure/go-amqp v1.2.0
github.com/Azure/go-amqp v1.4.0-beta.1
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2

6
go.sum
View File

@ -1,7 +1,5 @@
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A=
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.2.0 h1:NNyfN3/cRszWzMvjmm64yaPZDHX/2DJkowv8Ub9y01I=
github.com/Azure/go-amqp v1.2.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.4.0-beta.1 h1:BjZM/308FpfsQjX0gXtYK8Vx+WgQ1eng3oVQDEeXMmA=
github.com/Azure/go-amqp v1.4.0-beta.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=

View File

@ -3,7 +3,6 @@ package rabbitmq_amqp
import (
"errors"
"fmt"
"net/url"
"strings"
)
@ -77,16 +76,16 @@ func encodePathSegments(input string) string {
return encoded.String()
}
// Decode takes a percent-encoded string and returns its decoded representation.
func decode(input string) (string, error) {
// Use url.QueryUnescape which properly decodes percent-encoded strings
decoded, err := url.QueryUnescape(input)
if err != nil {
return "", err
}
return decoded, nil
}
//// Decode takes a percent-encoded string and returns its decoded representation.
//func decode(input string) (string, error) {
// // Use url.QueryUnescape which properly decodes percent-encoded strings
// decoded, err := url.QueryUnescape(input)
// if err != nil {
// return "", err
// }
//
// return decoded, nil
//}
// isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
@ -111,5 +110,8 @@ func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName,
}
format := "/%s/src=%s;%s=%s;key=%s;args="
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
}
func validateAddress(address string) bool {
return strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues))
}

View File

@ -7,10 +7,10 @@ import (
)
var _ = Describe("AMQP Bindings test ", func() {
var connection IConnection
var management IManagement
var connection *AmqpConnection
var management *AmqpManagement
BeforeEach(func() {
conn, err := Dial(context.TODO(), "amqp://", nil)
conn, err := Dial(context.TODO(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
connection = conn
management = connection.Management()

View File

@ -17,38 +17,50 @@ import (
type AmqpConnection struct {
Connection *amqp.Conn
management IManagement
management *AmqpManagement
lifeCycle *LifeCycle
session *amqp.Session
}
func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (IPublisher, error) {
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName))
func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (*Publisher, error) {
if !validateAddress(destinationAdd) {
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
}
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil {
return nil, err
}
return newPublisher(sender), nil
}
// Management returns the management interface for the connection.
// See IManagement interface.
func (a *AmqpConnection) Management() IManagement {
return a.management
}
// Dial creates a new AmqpConnection
// with a new AmqpManagement and a new LifeCycle.
// Returns a pointer to the new AmqpConnection
func Dial(ctx context.Context, addr string, connOptions *amqp.ConnOptions) (IConnection, error) {
// Dial connect to the AMQP 1.0 server using the provided connectionSettings
// Returns a pointer to the new AmqpConnection if successful else an error.
// addresses is a list of addresses to connect to. It picks one randomly.
// It is enough that one of the addresses is reachable.
func Dial(ctx context.Context, addresses []string, connOptions *amqp.ConnOptions) (*AmqpConnection, error) {
conn := &AmqpConnection{
management: NewAmqpManagement(),
lifeCycle: NewLifeCycle(),
}
err := conn.open(ctx, addr, connOptions)
if err != nil {
return nil, err
tmp := make([]string, len(addresses))
copy(tmp, addresses)
// random pick and extract one address to use for connection
for len(tmp) > 0 {
idx := random(len(tmp))
addr := tmp[idx]
// remove the index from the tmp list
tmp = append(tmp[:idx], tmp[idx+1:]...)
err := conn.open(ctx, addr, connOptions)
if err != nil {
Error("Failed to open connection", ExtractWithoutPassword(addr), err)
continue
}
Debug("Connected to", ExtractWithoutPassword(addr))
return conn, nil
}
return conn, nil
return nil, fmt.Errorf("no address to connect to")
}
// Open opens a connection to the AMQP 1.0 server.
@ -84,30 +96,42 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq
if err != nil {
return err
}
err = a.Management().Open(ctx, a)
err = a.management.Open(ctx, a)
if err != nil {
// TODO close connection?
return err
}
a.lifeCycle.SetStatus(Open)
a.lifeCycle.SetState(&StateOpen{})
return nil
}
func (a *AmqpConnection) Close(ctx context.Context) error {
err := a.Management().Close(ctx)
err := a.management.Close(ctx)
if err != nil {
return err
}
err = a.Connection.Close()
a.lifeCycle.SetStatus(Closed)
a.lifeCycle.SetState(&StateClosed{})
return err
}
func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) {
// NotifyStatusChange registers a channel to receive getState change notifications
// from the connection.
func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged) {
a.lifeCycle.chStatusChanged = channel
}
func (a *AmqpConnection) Status() int {
return a.lifeCycle.Status()
func (a *AmqpConnection) State() LifeCycleState {
return a.lifeCycle.State()
}
// *** management section ***
// Management returns the management interface for the connection.
// The management interface is used to declare and delete exchanges, queues, and bindings.
func (a *AmqpConnection) Management() *AmqpManagement {
return a.management
}
//*** end management section ***

View File

@ -11,7 +11,7 @@ import (
var _ = Describe("AMQP Connection Test", func() {
It("AMQP SASLTypeAnonymous Connection should succeed", func() {
connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{
SASLType: amqp.SASLTypeAnonymous()})
Expect(err).To(BeNil())
err = connection.Close(context.Background())
@ -20,7 +20,7 @@ var _ = Describe("AMQP Connection Test", func() {
It("AMQP SASLTypePlain Connection should succeed", func() {
connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("guest", "guest")})
Expect(err).To(BeNil())
@ -28,26 +28,37 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(err).To(BeNil())
})
It("AMQP Connection connect to the one correct uri and fails the others", func() {
conn, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://"}, nil)
Expect(err).To(BeNil())
Expect(conn.Close(context.Background()))
})
It("AMQP Connection should fail due of wrong Port", func() {
_, err := Dial(context.Background(), "amqp://localhost:1234", nil)
_, err := Dial(context.Background(), []string{"amqp://localhost:1234"}, nil)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fail due of wrong Host", func() {
_, err := Dial(context.Background(), "amqp://wrong_host:5672", nil)
_, err := Dial(context.Background(), []string{"amqp://wrong_host:5672"}, nil)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fails with all the wrong uris", func() {
_, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://nono"}, nil)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fail due to context cancellation", func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
cancel()
_, err := Dial(ctx, "amqp://", nil)
_, err := Dial(ctx, []string{"amqp://"}, nil)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should receive events", func() {
ch := make(chan *StatusChanged, 1)
connection, err := Dial(context.Background(), "amqp://", nil)
ch := make(chan *StateChanged, 1)
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
connection.NotifyStatusChange(ch)
err = connection.Close(context.Background())
@ -55,8 +66,8 @@ var _ = Describe("AMQP Connection Test", func() {
recv := <-ch
Expect(recv).NotTo(BeNil())
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
Expect(recv.From).To(Equal(&StateOpen{}))
Expect(recv.To).To(Equal(&StateClosed{}))
})
//It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() {

View File

@ -9,7 +9,7 @@ type AmqpExchangeInfo struct {
name string
}
func newAmqpExchangeInfo(name string) IExchangeInfo {
func newAmqpExchangeInfo(name string) *AmqpExchangeInfo {
return &AmqpExchangeInfo{name: name}
}
@ -33,7 +33,7 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
}
}
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error) {
path, err := ExchangeAddress(&e.name, nil)
if err != nil {
return nil, err

View File

@ -8,10 +8,10 @@ import (
)
var _ = Describe("AMQP Exchange test ", func() {
var connection IConnection
var management IManagement
var connection *AmqpConnection
var management *AmqpManagement
BeforeEach(func() {
conn, err := Dial(context.TODO(), "amqp://", nil)
conn, err := Dial(context.TODO(), []string{"amqp://"}, nil)
connection = conn
Expect(err).To(BeNil())
management = connection.Management()

View File

@ -43,7 +43,7 @@ func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error {
if a.sender == nil {
sender, err := a.session.NewSender(ctx, managementNodeAddress,
createSenderLinkOptions(managementNodeAddress, linkPairName))
createSenderLinkOptions(managementNodeAddress, linkPairName, AtMostOnce))
if err != nil {
return err
}
@ -54,8 +54,8 @@ func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error {
return nil
}
func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error {
session, err := connection.(*AmqpConnection).Connection.NewSession(ctx, nil)
func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error {
session, err := connection.Connection.NewSession(ctx, nil)
if err != nil {
return err
}
@ -77,7 +77,7 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error
// some channels or I/O or something elsewhere
time.Sleep(time.Millisecond * 10)
a.lifeCycle.SetStatus(Open)
a.lifeCycle.SetState(&StateOpen{})
return ctx.Err()
}
@ -85,7 +85,7 @@ func (a *AmqpManagement) Close(ctx context.Context) error {
_ = a.sender.Close(ctx)
_ = a.receiver.Close(ctx)
err := a.session.Close(ctx)
a.lifeCycle.SetStatus(Closed)
a.lifeCycle.SetState(&StateClosed{})
return err
}
@ -170,7 +170,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
return make(map[string]any), nil
}
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error) {
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification *QueueSpecification) (*AmqpQueueInfo, error) {
var amqpQueue *AmqpQueue
if specification == nil || len(specification.Name) <= 0 {
@ -195,7 +195,7 @@ func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error {
return q.Delete(ctx)
}
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error) {
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (*AmqpExchangeInfo, error) {
if exchangeSpecification == nil {
return nil, fmt.Errorf("exchangeSpecification is nil")
}
@ -224,7 +224,7 @@ func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error {
bind := newAMQPBinding(a)
return bind.Unbind(ctx, bindingPath)
}
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) {
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error) {
path, err := QueueAddress(&queueName)
if err != nil {
return nil, err
@ -241,10 +241,10 @@ func (a *AmqpManagement) PurgeQueue(ctx context.Context, queueName string) (int,
return purge.Purge(ctx)
}
func (a *AmqpManagement) NotifyStatusChange(channel chan *StatusChanged) {
func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged) {
a.lifeCycle.chStatusChanged = channel
}
func (a *AmqpManagement) Status() int {
return a.lifeCycle.Status()
func (a *AmqpManagement) State() LifeCycleState {
return a.lifeCycle.State()
}

View File

@ -11,7 +11,7 @@ import (
var _ = Describe("Management tests", func() {
It("AMQP Management should fail due to context cancellation", func() {
connection, err := Dial(context.Background(), "amqp://", nil)
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
@ -22,8 +22,8 @@ var _ = Describe("Management tests", func() {
})
It("AMQP Management should receive events", func() {
ch := make(chan *StatusChanged, 1)
connection, err := Dial(context.Background(), "amqp://", nil)
ch := make(chan *StateChanged, 1)
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
connection.NotifyStatusChange(ch)
err = connection.Close(context.Background())
@ -31,14 +31,14 @@ var _ = Describe("Management tests", func() {
recv := <-ch
Expect(recv).NotTo(BeNil())
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
Expect(recv.From).To(Equal(&StateOpen{}))
Expect(recv.To).To(Equal(&StateClosed{}))
Expect(connection.Close(context.Background())).To(BeNil())
})
It("Request", func() {
connection, err := Dial(context.Background(), "amqp://", nil)
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
management := connection.Management()
@ -58,7 +58,7 @@ var _ = Describe("Management tests", func() {
It("GET on non-existing queue returns ErrDoesNotExist", func() {
connection, err := Dial(context.Background(), "amqp://", nil)
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
management := connection.Management()

View File

@ -5,6 +5,11 @@ import (
"github.com/Azure/go-amqp"
)
type PublishResult struct {
Outcome amqp.DeliveryState
Message *amqp.Message
}
type Publisher struct {
sender *amqp.Sender
}
@ -13,26 +18,33 @@ func newPublisher(sender *amqp.Sender) *Publisher {
return &Publisher{sender: sender}
}
func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) error {
/// for the outcome of the message delivery, see https://github.com/Azure/go-amqp/issues/347
//RELEASED
///**
// * The broker could not route the message to any queue.
// *
// * <p>This is likely to be due to a topology misconfiguration.
// */
// so at the moment we don't have access on this information
// TODO: remove this comment when the issue is resolved
err := m.sender.Send(ctx, message, nil)
// Publish sends a message to the destination address.
// The message is sent to the destination address and the outcome of the operation is returned.
// The outcome is a DeliveryState that indicates if the message was accepted or rejected.
// RabbitMQ supports the following DeliveryState types:
// - StateAccepted
// - StateReleased
// - StateRejected
// See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) {
r, err := m.sender.SendWithReceipt(ctx, message, nil)
if err != nil {
return err
return nil, err
}
return nil
state, err := r.Wait(ctx)
if err != nil {
return nil, err
}
publishResult := &PublishResult{
Message: message,
Outcome: state,
}
return publishResult, err
}
// Close closes the publisher.
func (m *Publisher) Close(ctx context.Context) error {
return m.sender.Close(ctx)
}

View File

@ -10,7 +10,7 @@ import (
var _ = Describe("AMQP publisher ", func() {
It("Send a message to a queue with a Message Target Publisher", func() {
qName := generateNameWithDateTime("Send a message to a queue with a Message Target Publisher")
connection, err := Dial(context.Background(), "amqp://", nil)
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
@ -24,13 +24,90 @@ var _ = Describe("AMQP publisher ", func() {
Expect(publisher).NotTo(BeNil())
Expect(publisher).To(BeAssignableToTypeOf(&Publisher{}))
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
nMessages, err := connection.Management().PurgeQueue(context.Background(), qName)
Expect(err).To(BeNil())
Expect(nMessages).To(Equal(1))
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(publisher.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
})
It("Publisher should fail to a not existing exchange", func() {
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
exchangeName := "Nope"
addr, err := ExchangeAddress(&exchangeName, nil)
Expect(err).To(BeNil())
publisher, err := connection.Publisher(context.Background(), addr, "test")
Expect(err).NotTo(BeNil())
Expect(publisher).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("Publisher should fail if the destination address does not start in the correct way", func() {
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
destinationAddress := "this is not valid since does not start with exchanges or queues"
Expect(err).To(BeNil())
publisher, err := connection.Publisher(context.Background(), destinationAddress, "test")
Expect(err).NotTo(BeNil())
Expect(publisher).To(BeNil())
Expect(err.Error()).To(ContainSubstring("invalid destination address"))
Expect(connection.Close(context.Background())).To(BeNil())
})
It("publishResult should released to a not existing routing key", func() {
eName := generateNameWithDateTime("publishResult should released to a not existing routing key")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
exchange, err := connection.Management().DeclareExchange(context.Background(), &ExchangeSpecification{
Name: eName,
IsAutoDelete: false,
ExchangeType: ExchangeType{Type: Topic},
})
Expect(err).To(BeNil())
Expect(exchange).NotTo(BeNil())
routingKeyNope := "I don't exist"
addr, err := ExchangeAddress(&eName, &routingKeyNope)
Expect(err).To(BeNil())
publisher, err := connection.Publisher(context.Background(), addr, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{}))
Expect(connection.Management().DeleteExchange(context.Background(), eName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("Send a message to a deleted queue should fail", func() {
qName := generateNameWithDateTime("Send a message to a deleted queue should fail")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
_, err = connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
dest, _ := QueueAddress(&qName)
publisher, err := connection.Publisher(context.Background(), dest, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
Expect(err).To(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
err = connection.management.DeleteQueue(context.Background(), qName)
Expect(err).To(BeNil())
publishResult, err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
Expect(err).NotTo(BeNil())
Expect(connection.Close(context.Background()))
})
})

View File

@ -25,7 +25,7 @@ func (a *AmqpQueueInfo) Members() []string {
return a.members
}
func newAmqpQueueInfo(response map[string]any) IQueueInfo {
func newAmqpQueueInfo(response map[string]any) *AmqpQueueInfo {
return &AmqpQueueInfo{
name: response["name"].(string),
isDurable: response["durable"].(bool),
@ -133,7 +133,7 @@ func (a *AmqpQueue) validate() error {
return nil
}
func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error) {
if Quorum == a.GetQueueType() ||
Stream == a.GetQueueType() {
// mandatory arguments for quorum queues and streams

View File

@ -10,10 +10,10 @@ import (
)
var _ = Describe("AMQP Queue test ", func() {
var connection IConnection
var management IManagement
var connection *AmqpConnection
var management *AmqpManagement
BeforeEach(func() {
conn, err := Dial(context.TODO(), "amqp://", nil)
conn, err := Dial(context.TODO(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
connection = conn
management = connection.Management()
@ -148,7 +148,7 @@ var _ = Describe("AMQP Queue test ", func() {
It("AMQP Declare Queue should fail with Precondition fail", func() {
// The first queue is declared as Classic, and it should succeed
// The second queue is declared as Quorum, and it should fail since it is already declared as Classic
const queueName = "AMQP Declare Queue should fail with Precondition fail"
queueName := generateName("AMQP Declare Queue should fail with Precondition fail")
_, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
@ -168,7 +168,7 @@ var _ = Describe("AMQP Queue test ", func() {
})
It("AMQP Declare Queue should fail during validation", func() {
const queueName = "AMQP Declare Queue should fail during validation"
queueName := generateName("AMQP Declare Queue should fail during validation")
_, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
MaxLengthBytes: -1,
@ -188,7 +188,7 @@ var _ = Describe("AMQP Queue test ", func() {
})
It("AMQP Purge Queue should succeed and return the number of messages purged", func() {
const queueName = "AMQP Purge Queue should succeed and return the number of messages purged"
queueName := generateName("AMQP Purge Queue should succeed and return the number of messages purged")
queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
Name: queueName,
})
@ -197,6 +197,8 @@ var _ = Describe("AMQP Queue test ", func() {
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
Expect(err).To(BeNil())
Expect(purged).To(Equal(10))
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
It("AMQP GET on non-existing queue should return ErrDoesNotExist", func() {
@ -207,33 +209,24 @@ var _ = Describe("AMQP Queue test ", func() {
})
})
// TODO: This should be replaced with this library's publish function
// but for the time being, we need a way to publish messages or test purposes
func publishMessages(queueName string, count int) {
conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost", nil)
if err != nil {
Fail(err.Error())
}
session, err := conn.NewSession(context.TODO(), nil)
if err != nil {
Fail(err.Error())
}
conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil)
Expect(err).To(BeNil())
address, err := QueueAddress(&queueName)
if err != nil {
Fail(err.Error())
}
Expect(err).To(BeNil())
sender, err := session.NewSender(context.TODO(), address, nil)
if err != nil {
Fail(err.Error())
}
publisher, err := conn.Publisher(context.TODO(), address, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
for i := 0; i < count; i++ {
err = sender.Send(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))), nil)
if err != nil {
Fail(err.Error())
}
publishResult, err := publisher.Publish(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))))
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
}
err = conn.Close(context.TODO())
Expect(err).To(BeNil())
}

View File

@ -1,13 +1,29 @@
package rabbitmq_amqp
import "github.com/Azure/go-amqp"
import (
"github.com/Azure/go-amqp"
"math/rand"
"time"
)
const AtMostOnce = 0
const AtLeastOnce = 1
// senderLinkOptions returns the options for a sender link
// with the given address and link name.
// That should be the same for all the links.
func createSenderLinkOptions(address string, linkName string) *amqp.SenderOptions {
func createSenderLinkOptions(address string, linkName string, deliveryMode int) *amqp.SenderOptions {
prop := make(map[string]any)
prop["paired"] = true
sndSettleMode := amqp.SenderSettleModeSettled.Ptr()
/// SndSettleMode = deliveryMode == DeliveryMode.AtMostOnce
// ? SenderSettleMode.Settled
// : SenderSettleMode.Unsettled,
if deliveryMode == AtLeastOnce {
sndSettleMode = amqp.SenderSettleModeUnsettled.Ptr()
}
return &amqp.SenderOptions{
SourceAddress: address,
DynamicAddress: false,
@ -15,7 +31,7 @@ func createSenderLinkOptions(address string, linkName string) *amqp.SenderOption
ExpiryTimeout: 0,
Name: linkName,
Properties: prop,
SettlementMode: amqp.SenderSettleModeSettled.Ptr(),
SettlementMode: sndSettleMode,
RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(),
}
}
@ -37,3 +53,8 @@ func createReceiverLinkOptions(address string, linkName string) *amqp.ReceiverOp
Credit: 100,
}
}
func random(max int) int {
r := rand.New(rand.NewSource(time.Now().Unix()))
return r.Intn(max)
}

View File

@ -1,32 +0,0 @@
package rabbitmq_amqp
import (
"context"
"github.com/Azure/go-amqp"
)
type IConnection interface {
// Close closes the connection to the AMQP 1.0 server.
Close(ctx context.Context) error
// Management returns the management interface for the connection.
Management() IManagement
// NotifyStatusChange registers a channel to receive status change notifications.
// The channel will receive a StatusChanged struct whenever the status of the connection changes.
NotifyStatusChange(channel chan *StatusChanged)
// Status returns the current status of the connection.
// See LifeCycle struct for more information.
Status() int
// Publisher returns a new IPublisher interface for the connection.
Publisher(ctx context.Context, destinationAddr string, linkName string) (IPublisher, error)
}
// IPublisher is an interface for publishers messages based.
// on the AMQP 1.0 protocol.
type IPublisher interface {
Publish(ctx context.Context, message *amqp.Message) error
Close(ctx context.Context) error
}

View File

@ -27,20 +27,6 @@ type QueueSpecification struct {
DeadLetterRoutingKey string
}
// IQueueInfo represents the information of a queue
// It is returned by the Declare method of IQueueSpecification
// The information come from the server
type IQueueInfo interface {
Name() string
IsDurable() bool
IsAutoDelete() bool
IsExclusive() bool
Type() TQueueType
Leader() string
Members() []string
Arguments() map[string]any
}
type TExchangeType string
const (
@ -57,13 +43,6 @@ func (e ExchangeType) String() string {
return string(e.Type)
}
// IExchangeInfo represents the information of an exchange
// It is empty at the moment because the server does not return any information
// We leave it here for future use. In case the server returns information about an exchange
type IExchangeInfo interface {
Name() string
}
type ExchangeSpecification struct {
Name string
IsAutoDelete bool

View File

@ -5,70 +5,102 @@ import (
"sync"
)
type LifeCycleState interface {
getState() int
}
type StateOpen struct {
}
func (o *StateOpen) getState() int {
return open
}
type StateReconnecting struct {
}
func (r *StateReconnecting) getState() int {
return reconnecting
}
type StateClosing struct {
}
func (c *StateClosing) getState() int {
return closing
}
type StateClosed struct {
}
func (c *StateClosed) getState() int {
return closed
}
const (
Open = iota
Reconnecting = iota
Closing = iota
Closed = iota
open = iota
reconnecting = iota
closing = iota
closed = iota
)
func statusToString(status int) string {
switch status {
case Open:
return "Open"
case Reconnecting:
return "Reconnecting"
case Closing:
return "Closing"
case Closed:
return "Closed"
func statusToString(status LifeCycleState) string {
switch status.getState() {
case open:
return "open"
case reconnecting:
return "reconnecting"
case closing:
return "closing"
case closed:
return "closed"
}
return "Unknown"
return "unknown"
}
type StatusChanged struct {
From int
To int
type StateChanged struct {
From LifeCycleState
To LifeCycleState
}
func (s StatusChanged) String() string {
func (s StateChanged) String() string {
return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To))
}
type LifeCycle struct {
status int
chStatusChanged chan *StatusChanged
state LifeCycleState
chStatusChanged chan *StateChanged
mutex *sync.Mutex
}
func NewLifeCycle() *LifeCycle {
return &LifeCycle{
status: Closed,
mutex: &sync.Mutex{},
state: &StateClosed{},
mutex: &sync.Mutex{},
}
}
func (l *LifeCycle) Status() int {
func (l *LifeCycle) State() LifeCycleState {
l.mutex.Lock()
defer l.mutex.Unlock()
return l.status
return l.state
}
func (l *LifeCycle) SetStatus(value int) {
func (l *LifeCycle) SetState(value LifeCycleState) {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.status == value {
if l.state == value {
return
}
oldState := l.status
l.status = value
oldState := l.state
l.state = value
if l.chStatusChanged == nil {
return
}
l.chStatusChanged <- &StatusChanged{
l.chStatusChanged <- &StateChanged{
From: oldState,
To: value,
}

19
rabbitmq_amqp/log.go Normal file
View File

@ -0,0 +1,19 @@
package rabbitmq_amqp
import "log/slog"
func Info(msg string, args ...any) {
slog.Info(msg, args...)
}
func Debug(msg string, args ...any) {
slog.Debug(msg, args...)
}
func Error(msg string, args ...any) {
slog.Error(msg, args...)
}
func Warn(msg string, args ...any) {
slog.Warn(msg, args...)
}

View File

@ -1,44 +0,0 @@
package rabbitmq_amqp
import (
"context"
)
type IManagement interface {
// Open setups the sender and receiver links to the management interface.
Open(ctx context.Context, connection IConnection) error
// Close closes the sender and receiver links to the management interface.
Close(ctx context.Context) error
// DeclareQueue creates a queue with the specified specification.
DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error)
// DeleteQueue deletes the queue with the specified name.
DeleteQueue(ctx context.Context, name string) error
// DeclareExchange creates an exchange with the specified specification.
DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error)
// DeleteExchange deletes the exchange with the specified name.
DeleteExchange(ctx context.Context, name string) error
//Bind creates a binding between an exchange and a queue or exchange
Bind(ctx context.Context, bindingSpecification *BindingSpecification) (string, error)
// Unbind removes a binding between an exchange and a queue or exchange given the binding path.
Unbind(ctx context.Context, bindingPath string) error
// PurgeQueue removes all messages from the queue. Returns the number of messages purged.
PurgeQueue(ctx context.Context, queueName string) (int, error)
// QueueInfo returns information about the queue with the specified name.
QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error)
// Status returns the current status of the management interface.
// See LifeCycle struct for more information.
Status() int
// NotifyStatusChange registers a channel to receive status change notifications.
// The channel will receive a StatusChanged struct whenever the status of the management interface changes.
NotifyStatusChange(channel chan *StatusChanged)
//Request sends a request to the management interface with the specified body, path, and method.
//Returns the response body as a map[string]any.
//It usually is not necessary to call this method directly. Leave it public for custom use cases.
// The calls above are the recommended way to interact with the management interface.
Request(ctx context.Context, body any, path string, method string,
expectedResponseCodes []int) (map[string]any, error)
}

View File

@ -115,3 +115,13 @@ func ParseURI(uri string) (URI, error) {
return builder, nil
}
// Extract the Uri by omitting the password
func ExtractWithoutPassword(addr string) string {
u, err := ParseURI(addr)
if err != nil {
return ""
}
return u.Scheme + "://" + u.Username + "@*****" + u.Host + ":" + strconv.Itoa(u.Port) + u.Vhost
}