Implement publisher (#16)

* Implement publisher
* API refactor 
---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2024-11-21 10:34:08 +01:00 committed by GitHub
parent 60e006b2a3
commit 1a6679a201
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 760 additions and 357 deletions

View File

@ -3,38 +3,51 @@ package main
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"time"
)
func main() {
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
exchangeName := "getting-started-exchange"
queueName := "getting-started-queue"
routingKey := "routing-key"
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
/// Create a channel to receive status change notifications
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
go func(ch chan *rabbitmq_amqp.StatusChanged) {
for statusChanged := range ch {
fmt.Printf("%s\n", statusChanged)
}
}(chStatusChanged)
amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged)
err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings())
// Open a connection to the AMQP 1.0 server
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil)
if err != nil {
fmt.Printf("Error opening connection: %v\n", err)
return
}
// Register the channel to receive status change notifications
amqpConnection.NotifyStatusChange(chStatusChanged)
fmt.Printf("AMQP Connection opened.\n")
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
Name: "getting-started-exchange",
Name: exchangeName,
})
if err != nil {
fmt.Printf("Error declaring exchange: %v\n", err)
return
}
// Declare a Quorum queue
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
Name: "getting-started-queue",
Name: queueName,
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
})
@ -43,10 +56,11 @@ func main() {
return
}
// Bind the queue to the exchange
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
SourceExchange: exchangeInfo.Name(),
DestinationQueue: queueInfo.Name(),
BindingKey: "routing-key",
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})
if err != nil {
@ -54,6 +68,33 @@ func main() {
return
}
addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
if err != nil {
fmt.Printf("Error creating publisher: %v\n", err)
return
}
// Publish a message to the exchange
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
if err != nil {
fmt.Printf("Error publishing message: %v\n", err)
return
}
println("press any key to close the connection")
var input string
_, _ = fmt.Scanln(&input)
// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
return
}
// Unbind the queue from the exchange
err = management.Unbind(context.TODO(), bindingPath)
if err != nil {
@ -67,6 +108,14 @@ func main() {
return
}
// Purge the queue
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error purging queue: %v\n", err)
return
}
fmt.Printf("Purged %d messages from the queue.\n", purged)
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error deleting queue: %v\n", err)

View File

@ -1,78 +0,0 @@
package rabbitmq_amqp
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Address builder test ", func() {
It("With exchange, queue and key should raise and error", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("queue").Exchange("exchange").Key("key")
_, err := addressBuilder.Address()
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
})
It("Without exchange and queue should raise and error", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
_, err := addressBuilder.Address()
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange or queue must be set"))
})
It("With exchange and key should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_exchange").Key("my_key")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
})
It("With exchange should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_exchange")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange"))
})
It("With exchange and key with names to encode should return the encoded address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_ exchange/()").Key("my_key ")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
})
It("With queue should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("my_queue>")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue%3E"))
})
It("With queue and append should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("my_queue").Append("/messages")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue/messages"))
})
})

View File

@ -7,66 +7,56 @@ import (
"strings"
)
type AddressBuilder struct {
queue *string
exchange *string
key *string
append *string
}
func NewAddressBuilder() *AddressBuilder {
return &AddressBuilder{}
}
func (a *AddressBuilder) Queue(queue string) *AddressBuilder {
a.queue = &queue
return a
}
func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder {
a.exchange = &exchange
return a
}
func (a *AddressBuilder) Key(key string) *AddressBuilder {
a.key = &key
return a
}
func (a *AddressBuilder) Append(append string) *AddressBuilder {
a.append = &append
return a
}
func (a *AddressBuilder) Address() (string, error) {
if a.exchange == nil && a.queue == nil {
// Address Creates the address for the exchange or queue following the RabbitMQ conventions.
// see: https://www.rabbitmq.com/docs/next/amqp#address-v2
func Address(exchange, key, queue *string, urlParameters *string) (string, error) {
if exchange == nil && queue == nil {
return "", errors.New("exchange or queue must be set")
}
urlAppend := ""
if !isStringNilOrEmpty(a.append) {
urlAppend = *a.append
if !isStringNilOrEmpty(urlParameters) {
urlAppend = *urlParameters
}
if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) {
if !isStringNilOrEmpty(exchange) && !isStringNilOrEmpty(queue) {
return "", errors.New("exchange and queue cannot be set together")
}
if !isStringNilOrEmpty(a.exchange) {
if !isStringNilOrEmpty(a.key) {
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil
if !isStringNilOrEmpty(exchange) {
if !isStringNilOrEmpty(key) {
return "/" + exchanges + "/" + encodePathSegments(*exchange) + "/" + encodePathSegments(*key) + urlAppend, nil
}
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil
return "/" + exchanges + "/" + encodePathSegments(*exchange) + urlAppend, nil
}
if a.queue == nil {
if queue == nil {
return "", nil
}
if isStringNilOrEmpty(a.queue) {
if isStringNilOrEmpty(queue) {
return "", errors.New("queue must be set")
}
return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil
return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil
}
// ExchangeAddress Creates the address for the exchange
// See Address for more information
func ExchangeAddress(exchange, key *string) (string, error) {
return Address(exchange, key, nil, nil)
}
// QueueAddress Creates the address for the queue.
// See Address for more information
func QueueAddress(queue *string) (string, error) {
return Address(nil, nil, queue, nil)
}
// PurgeQueueAddress Creates the address for purging the queue.
// See Address for more information
func PurgeQueueAddress(queue *string) (string, error) {
parameter := "/messages"
return Address(nil, nil, queue, &parameter)
}
// encodePathSegments takes a string and returns its percent-encoded representation.

View File

@ -0,0 +1,64 @@
package rabbitmq_amqp
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Address builder test ", func() {
It("With exchange, queue and key should raise and error", func() {
queue := "my_queue"
exchange := "my_exchange"
_, err := Address(&exchange, nil, &queue, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
})
It("Without exchange and queue should raise and error", func() {
_, err := Address(nil, nil, nil, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange or queue must be set"))
})
It("With exchange and key should return address", func() {
exchange := "my_exchange"
key := "my_key"
address, err := Address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
})
It("With exchange should return address", func() {
exchange := "my_exchange"
address, err := Address(&exchange, nil, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange"))
})
It("With exchange and key with names to encode should return the encoded address", func() {
exchange := "my_ exchange/()"
key := "my_key "
address, err := Address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
})
It("With queue should return address", func() {
queue := "my_queue>"
address, err := Address(nil, nil, &queue, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue%3E"))
})
It("With queue and urlParameters should return address", func() {
queue := "my_queue"
address, err := PurgeQueueAddress(&queue)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue/messages"))
})
})

View File

@ -10,14 +10,9 @@ var _ = Describe("AMQP Bindings test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
connection = NewAmqpConnection()
Expect(connection).NotTo(BeNil())
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := connection.Open(context.TODO(), connectionSettings)
conn, err := Dial(context.TODO(), "amqp://", nil)
Expect(err).To(BeNil())
connection = conn
management = connection.Management()
})

View File

@ -2,10 +2,11 @@ package rabbitmq_amqp
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
)
//func (c *ConnectionSettings) UseSsl(value bool) {
//func (c *ConnUrlHelper) UseSsl(value bool) {
// c.UseSsl = value
// if value {
// c.Scheme = "amqps"
@ -18,6 +19,15 @@ type AmqpConnection struct {
Connection *amqp.Conn
management IManagement
lifeCycle *LifeCycle
session *amqp.Session
}
func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (IPublisher, error) {
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName))
if err != nil {
return nil, err
}
return newPublisher(sender), nil
}
// Management returns the management interface for the connection.
@ -26,58 +36,61 @@ func (a *AmqpConnection) Management() IManagement {
return a.management
}
// NewAmqpConnection creates a new AmqpConnection
// Dial creates a new AmqpConnection
// with a new AmqpManagement and a new LifeCycle.
// Returns a pointer to the new AmqpConnection
func NewAmqpConnection() IConnection {
return &AmqpConnection{
func Dial(ctx context.Context, addr string, connOptions *amqp.ConnOptions) (IConnection, error) {
conn := &AmqpConnection{
management: NewAmqpManagement(),
lifeCycle: NewLifeCycle(),
}
}
// NewAmqpConnectionNotifyStatusChanged creates a new AmqpConnection
// with a new AmqpManagement and a new LifeCycle
// and sets the channel for status changes.
// Returns a pointer to the new AmqpConnection
func NewAmqpConnectionNotifyStatusChanged(channel chan *StatusChanged) IConnection {
lifeCycle := NewLifeCycle()
lifeCycle.chStatusChanged = channel
return &AmqpConnection{
management: NewAmqpManagement(),
lifeCycle: lifeCycle,
err := conn.open(ctx, addr, connOptions)
if err != nil {
return nil, err
}
return conn, nil
}
// Open opens a connection to the AMQP 1.0 server.
// using the provided connectionSettings and the AMQPLite library.
// Setups the connection and the management interface.
func (a *AmqpConnection) Open(ctx context.Context, connectionSettings *ConnectionSettings) error {
sASLType := amqp.SASLTypeAnonymous()
switch connectionSettings.SaslMechanism {
case Plain:
sASLType = amqp.SASLTypePlain(connectionSettings.User, connectionSettings.Password)
case External:
sASLType = amqp.SASLTypeExternal("")
func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amqp.ConnOptions) error {
if connOptions == nil {
connOptions = &amqp.ConnOptions{
// RabbitMQ requires SASL security layer
// to be enabled for AMQP 1.0 connections.
// So this is mandatory and default in case not defined.
SASLType: amqp.SASLTypeAnonymous(),
}
}
conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{
ContainerID: connectionSettings.ContainerId,
SASLType: sASLType,
HostName: connectionSettings.VirtualHost,
TLSConfig: connectionSettings.TlsConfig,
})
//connOptions.HostName is the way to set the virtual host
// so we need to pre-parse the URI to get the virtual host
// the PARSE is copied from go-amqp091 library
// the URI will be parsed is parsed again in the amqp lite library
uri, err := ParseURI(addr)
if err != nil {
return err
}
connOptions.HostName = fmt.Sprintf("vhost:%s", uri.Vhost)
conn, err := amqp.Dial(ctx, addr, connOptions)
if err != nil {
return err
}
a.Connection = conn
a.lifeCycle.SetStatus(Open)
a.session, err = a.Connection.NewSession(ctx, nil)
if err != nil {
return err
}
err = a.Management().Open(ctx, a)
if err != nil {
// TODO close connection?
return err
}
a.lifeCycle.SetStatus(Open)
return nil
}

View File

@ -2,6 +2,7 @@ package rabbitmq_amqp
import (
"context"
"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"time"
@ -9,94 +10,51 @@ import (
var _ = Describe("AMQP Connection Test", func() {
It("AMQP SASLTypeAnonymous Connection should succeed", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
connectionSettings.SaslMechanism = Anonymous
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
SASLType: amqp.SASLTypeAnonymous()})
Expect(err).To(BeNil())
err = amqpConnection.Close(context.Background())
err = connection.Close(context.Background())
Expect(err).To(BeNil())
})
It("AMQP SASLTypePlain Connection should succeed", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.SaslMechanism = Plain
connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("guest", "guest")})
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())
err = amqpConnection.Close(context.Background())
err = connection.Close(context.Background())
Expect(err).To(BeNil())
})
It("AMQP Connection should fail due of wrong Port", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := &ConnectionSettings{
Host: "localhost",
Port: 1234,
}
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
_, err := Dial(context.Background(), "amqp://localhost:1234", nil)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fail due of wrong Host", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := &ConnectionSettings{
Host: "wronghost",
Port: 5672,
}
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
_, err := Dial(context.Background(), "amqp://wrong_host:5672", nil)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fail due to context cancellation", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
cancel()
err := amqpConnection.Open(ctx, NewConnectionSettings())
_, err := Dial(ctx, "amqp://", nil)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should receive events", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
ch := make(chan *StatusChanged, 1)
amqpConnection.NotifyStatusChange(ch)
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
connection.NotifyStatusChange(ch)
err = connection.Close(context.Background())
Expect(err).To(BeNil())
recv := <-ch
Expect(recv).NotTo(BeNil())
Expect(recv.From).To(Equal(Closed))
Expect(recv.To).To(Equal(Open))
err = amqpConnection.Close(context.Background())
Expect(err).To(BeNil())
recv = <-ch
Expect(recv).NotTo(BeNil())
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
})
@ -106,13 +64,13 @@ var _ = Describe("AMQP Connection Test", func() {
// Expect(amqpConnection).NotTo(BeNil())
// Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
//
// connectionSettings := NewConnectionSettings().
// connectionSettings := NewConnUrlHelper().
// UseSsl(true).Port(5671).TlsConfig(&tls.Config{
// //ServerName: "localhost",
// InsecureSkipVerify: true,
// })
// Expect(connectionSettings).NotTo(BeNil())
// Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
// Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnUrlHelper{}))
// err := amqpConnection.Open(context.Background(), connectionSettings)
// Expect(err).To(BeNil())
//})

View File

@ -34,7 +34,7 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
}
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
path, err := NewAddressBuilder().Exchange(e.name).Address()
path, err := ExchangeAddress(&e.name, nil)
if err != nil {
return nil, err
}
@ -59,7 +59,7 @@ func (e *AmqpExchange) IsAutoDelete() bool {
}
func (e *AmqpExchange) Delete(ctx context.Context) error {
path, err := NewAddressBuilder().Exchange(e.name).Address()
path, err := ExchangeAddress(&e.name, nil)
if err != nil {
return err
}

View File

@ -11,13 +11,8 @@ var _ = Describe("AMQP Exchange test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
connection = NewAmqpConnection()
Expect(connection).NotTo(BeNil())
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := connection.Open(context.TODO(), connectionSettings)
conn, err := Dial(context.TODO(), "amqp://", nil)
connection = conn
Expect(err).To(BeNil())
management = connection.Management()
})

View File

@ -4,11 +4,10 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/Azure/go-amqp"
"github.com/google/uuid"
"strconv"
"time"
)
var ErrPreconditionFailed = errors.New("precondition Failed")
@ -30,18 +29,7 @@ func NewAmqpManagement() *AmqpManagement {
func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
if a.receiver == nil {
prop := make(map[string]any)
prop["paired"] = true
opts := &amqp.ReceiverOptions{
DynamicAddress: false,
Name: linkPairName,
Properties: prop,
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
TargetAddress: managementNodeAddress,
ExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
Credit: 100,
}
opts := createReceiverLinkOptions(managementNodeAddress, linkPairName)
receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts)
if err != nil {
return err
@ -54,19 +42,8 @@ func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error {
if a.sender == nil {
prop := make(map[string]any)
prop["paired"] = true
opts := &amqp.SenderOptions{
DynamicAddress: false,
ExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
ExpiryTimeout: 0,
Name: linkPairName,
Properties: prop,
SettlementMode: amqp.SenderSettleModeSettled.Ptr(),
RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(),
SourceAddress: managementNodeAddress,
}
sender, err := a.session.NewSender(ctx, managementNodeAddress, opts)
sender, err := a.session.NewSender(ctx, managementNodeAddress,
createSenderLinkOptions(managementNodeAddress, linkPairName))
if err != nil {
return err
}
@ -248,7 +225,7 @@ func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error {
return bind.Unbind(ctx, bindingPath)
}
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) {
path, err := NewAddressBuilder().Queue(queueName).Address()
path, err := QueueAddress(&queueName)
if err != nil {
return nil, err
}

View File

@ -11,51 +11,37 @@ import (
var _ = Describe("Management tests", func() {
It("AMQP Management should fail due to context cancellation", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
cancel()
err = amqpConnection.Management().Open(ctx, amqpConnection)
err = connection.Management().Open(ctx, connection)
Expect(err).NotTo(BeNil())
Expect(amqpConnection.Close(context.Background())).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Management should receive events", func() {
ch := make(chan *StatusChanged, 1)
amqpConnection := NewAmqpConnectionNotifyStatusChanged(ch)
Expect(amqpConnection).NotTo(BeNil())
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
connection.NotifyStatusChange(ch)
err = connection.Close(context.Background())
Expect(err).To(BeNil())
recv := <-ch
Expect(recv).NotTo(BeNil())
Expect(recv.From).To(Equal(Closed))
Expect(recv.To).To(Equal(Open))
err = amqpConnection.Close(context.Background())
Expect(err).To(BeNil())
recv = <-ch
Expect(recv).NotTo(BeNil())
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
Expect(amqpConnection.Close(context.Background())).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("Request", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
management := amqpConnection.Management()
management := connection.Management()
kv := make(map[string]any)
kv["durable"] = true
kv["auto_delete"] = false
@ -67,21 +53,15 @@ var _ = Describe("Management tests", func() {
Expect(err).To(BeNil())
Expect(result).NotTo(BeNil())
Expect(management.Close(context.Background())).To(BeNil())
Expect(amqpConnection.Close(context.Background())).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("GET on non-existing queue returns ErrDoesNotExist", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
management := amqpConnection.Management()
management := connection.Management()
path := "/queues/i-do-not-exist"
result, err := management.Request(context.Background(), amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
Expect(err).To(Equal(ErrDoesNotExist))

View File

@ -0,0 +1,38 @@
package rabbitmq_amqp
import (
"context"
"github.com/Azure/go-amqp"
)
type Publisher struct {
sender *amqp.Sender
}
func newPublisher(sender *amqp.Sender) *Publisher {
return &Publisher{sender: sender}
}
func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) error {
/// for the outcome of the message delivery, see https://github.com/Azure/go-amqp/issues/347
//RELEASED
///**
// * The broker could not route the message to any queue.
// *
// * <p>This is likely to be due to a topology misconfiguration.
// */
// so at the moment we don't have access on this information
// TODO: remove this comment when the issue is resolved
err := m.sender.Send(ctx, message, nil)
if err != nil {
return err
}
return nil
}
func (m *Publisher) Close(ctx context.Context) error {
return m.sender.Close(ctx)
}

View File

@ -0,0 +1,36 @@
package rabbitmq_amqp
import (
"context"
"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("AMQP publisher ", func() {
It("Send a message to a queue with a Message Target Publisher", func() {
qName := generateNameWithDateTime("Send a message to a queue with a Message Target Publisher")
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
dest, _ := QueueAddress(&qName)
publisher, err := connection.Publisher(context.Background(), dest, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
Expect(publisher).To(BeAssignableToTypeOf(&Publisher{}))
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
Expect(err).To(BeNil())
nMessages, err := connection.Management().PurgeQueue(context.Background(), qName)
Expect(err).To(BeNil())
Expect(nMessages).To(Equal(1))
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(publisher.Close(context.Background())).To(BeNil())
})
})

View File

@ -149,7 +149,7 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
a.name = generateNameWithDefaultPrefix()
}
path, err := NewAddressBuilder().Queue(a.name).Address()
path, err := QueueAddress(&a.name)
if err != nil {
return nil, err
}
@ -166,7 +166,7 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
}
func (a *AmqpQueue) Delete(ctx context.Context) error {
path, err := NewAddressBuilder().Queue(a.name).Address()
path, err := QueueAddress(&a.name)
if err != nil {
return err
}
@ -175,7 +175,7 @@ func (a *AmqpQueue) Delete(ctx context.Context) error {
}
func (a *AmqpQueue) Purge(ctx context.Context) (int, error) {
path, err := NewAddressBuilder().Queue(a.name).Append("/messages").Address()
path, err := PurgeQueueAddress(&a.name)
if err != nil {
return 0, err
}

View File

@ -13,14 +13,9 @@ var _ = Describe("AMQP Queue test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
connection = NewAmqpConnection()
Expect(connection).NotTo(BeNil())
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := connection.Open(context.TODO(), connectionSettings)
conn, err := Dial(context.TODO(), "amqp://", nil)
Expect(err).To(BeNil())
connection = conn
management = connection.Management()
})
@ -224,7 +219,7 @@ func publishMessages(queueName string, count int) {
Fail(err.Error())
}
address, err := NewAddressBuilder().Queue(queueName).Address()
address, err := QueueAddress(&queueName)
if err != nil {
Fail(err.Error())
}

View File

@ -0,0 +1,39 @@
package rabbitmq_amqp
import "github.com/Azure/go-amqp"
// senderLinkOptions returns the options for a sender link
// with the given address and link name.
// That should be the same for all the links.
func createSenderLinkOptions(address string, linkName string) *amqp.SenderOptions {
prop := make(map[string]any)
prop["paired"] = true
return &amqp.SenderOptions{
SourceAddress: address,
DynamicAddress: false,
ExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
ExpiryTimeout: 0,
Name: linkName,
Properties: prop,
SettlementMode: amqp.SenderSettleModeSettled.Ptr(),
RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(),
}
}
// receiverLinkOptions returns the options for a receiver link
// with the given address and link name.
// That should be the same for all the links.
func createReceiverLinkOptions(address string, linkName string) *amqp.ReceiverOptions {
prop := make(map[string]any)
prop["paired"] = true
return &amqp.ReceiverOptions{
TargetAddress: address,
DynamicAddress: false,
Name: linkName,
Properties: prop,
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
ExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
Credit: 100,
}
}

View File

@ -2,57 +2,10 @@ package rabbitmq_amqp
import (
"context"
"crypto/tls"
"fmt"
"github.com/Azure/go-amqp"
)
type TSaslMechanism string
const (
Plain TSaslMechanism = "plain"
External TSaslMechanism = "external"
Anonymous TSaslMechanism = "anonymous"
)
type SaslMechanism struct {
Type TSaslMechanism
}
type ConnectionSettings struct {
Host string
Port int
User string
Password string
VirtualHost string
Scheme string
ContainerId string
UseSsl bool
TlsConfig *tls.Config
SaslMechanism TSaslMechanism
}
func (c *ConnectionSettings) BuildAddress() string {
return c.Scheme + "://" + c.Host + ":" + fmt.Sprint(c.Port)
}
// NewConnectionSettings creates a new ConnectionSettings struct with default values.
func NewConnectionSettings() *ConnectionSettings {
return &ConnectionSettings{
Host: "localhost",
Port: 5672,
User: "guest",
Password: "guest",
VirtualHost: "/",
Scheme: "amqp",
ContainerId: "amqp-go-client",
UseSsl: false,
TlsConfig: nil,
}
}
type IConnection interface {
// Open opens a connection to the AMQP 1.0 server.
Open(ctx context.Context, connectionSettings *ConnectionSettings) error
// Close closes the connection to the AMQP 1.0 server.
Close(ctx context.Context) error
@ -66,4 +19,14 @@ type IConnection interface {
// Status returns the current status of the connection.
// See LifeCycle struct for more information.
Status() int
// Publisher returns a new IPublisher interface for the connection.
Publisher(ctx context.Context, destinationAddr string, linkName string) (IPublisher, error)
}
// IPublisher is an interface for publishers messages based.
// on the AMQP 1.0 protocol.
type IPublisher interface {
Publish(ctx context.Context, message *amqp.Message) error
Close(ctx context.Context) error
}

View File

@ -0,0 +1,12 @@
package rabbitmq_amqp
import (
"fmt"
"strconv"
"time"
)
func generateNameWithDateTime(name string) string {
return fmt.Sprintf("%s_%s", name, strconv.FormatInt(time.Now().Unix(), 10))
}

117
rabbitmq_amqp/uri.go Normal file
View File

@ -0,0 +1,117 @@
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rabbitmq_amqp
import (
"errors"
"net/url"
"strconv"
"strings"
)
var (
errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'")
errURIWhitespace = errors.New("URI must not contain whitespace")
)
var schemePorts = map[string]int{
"amqp": 5672,
"amqps": 5671,
}
var defaultURI = URI{
Scheme: "amqp",
Host: "localhost",
Port: 5672,
Username: "guest",
Password: "guest",
Vhost: "/",
}
// URI represents a parsed AMQP URI string.
type URI struct {
Scheme string
Host string
Port int
Username string
Password string
Vhost string
}
// ParseURI attempts to parse the given AMQP URI according to the spec.
// See http://www.rabbitmq.com/uri-spec.html.
//
// Default values for the fields are:
//
// Scheme: amqp
// Host: localhost
// Port: 5672
// Username: guest
// Password: guest
// Vhost: /
func ParseURI(uri string) (URI, error) {
builder := defaultURI
if strings.Contains(uri, " ") {
return builder, errURIWhitespace
}
u, err := url.Parse(uri)
if err != nil {
return builder, err
}
defaultPort, okScheme := schemePorts[u.Scheme]
if okScheme {
builder.Scheme = u.Scheme
} else {
return builder, errURIScheme
}
host := u.Hostname()
port := u.Port()
if host != "" {
builder.Host = host
}
if port != "" {
port32, err := strconv.ParseInt(port, 10, 32)
if err != nil {
return builder, err
}
builder.Port = int(port32)
} else {
builder.Port = defaultPort
}
if u.User != nil {
builder.Username = u.User.Username()
if password, ok := u.User.Password(); ok {
builder.Password = password
}
}
if u.Path != "" {
if strings.HasPrefix(u.Path, "/") {
if u.Host == "" && strings.HasPrefix(u.Path, "///") {
// net/url doesn't handle local context authorities and leaves that up
// to the scheme handler. In our case, we translate amqp:/// into the
// default host and whatever the vhost should be
if len(u.Path) > 3 {
builder.Vhost = u.Path[3:]
}
} else if len(u.Path) > 1 {
builder.Vhost = u.Path[1:]
}
} else {
builder.Vhost = u.Path
}
}
return builder, nil
}

260
rabbitmq_amqp/uri_test.go Normal file
View File

@ -0,0 +1,260 @@
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rabbitmq_amqp
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// Test matrix defined on http://www.rabbitmq.com/uri-spec.html
type testURI struct {
url string
username string
password string
host string
port int
vhost string
canon string
}
var uriTests = []testURI{
{
url: "amqp://user:pass@host:10000/vhost",
username: "user",
password: "pass",
host: "host",
port: 10000,
vhost: "vhost",
canon: "amqp://user:pass@host:10000/vhost",
},
{
url: "amqp://",
username: defaultURI.Username,
password: defaultURI.Password,
host: defaultURI.Host,
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://localhost/",
},
{
url: "amqp://:@/",
username: "",
password: "",
host: defaultURI.Host,
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://:@localhost/",
},
{
url: "amqp://user@",
username: "user",
password: defaultURI.Password,
host: defaultURI.Host,
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://user@localhost/",
},
{
url: "amqp://user:pass@",
username: "user",
password: "pass",
host: defaultURI.Host,
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://user:pass@localhost/",
},
{
url: "amqp://guest:pass@",
username: "guest",
password: "pass",
host: defaultURI.Host,
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://guest:pass@localhost/",
},
{
url: "amqp://host",
username: defaultURI.Username,
password: defaultURI.Password,
host: "host",
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://host/",
},
{
url: "amqp://:10000",
username: defaultURI.Username,
password: defaultURI.Password,
host: defaultURI.Host,
port: 10000,
vhost: defaultURI.Vhost,
canon: "amqp://localhost:10000/",
},
{
url: "amqp:///vhost",
username: defaultURI.Username,
password: defaultURI.Password,
host: defaultURI.Host,
port: defaultURI.Port,
vhost: "vhost",
canon: "amqp://localhost/vhost",
},
{
url: "amqp://host/",
username: defaultURI.Username,
password: defaultURI.Password,
host: "host",
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://host/",
},
{
url: "amqp://host/%2F",
username: defaultURI.Username,
password: defaultURI.Password,
host: "host",
port: defaultURI.Port,
vhost: "/",
canon: "amqp://host/",
},
{
url: "amqp://host/%2F%2F",
username: defaultURI.Username,
password: defaultURI.Password,
host: "host",
port: defaultURI.Port,
vhost: "//",
canon: "amqp://host/%2F%2F",
},
{
url: "amqp://host/%2Fslash%2F",
username: defaultURI.Username,
password: defaultURI.Password,
host: "host",
port: defaultURI.Port,
vhost: "/slash/",
canon: "amqp://host/%2Fslash%2F",
},
{
url: "amqp://192.168.1.1:1000/",
username: defaultURI.Username,
password: defaultURI.Password,
host: "192.168.1.1",
port: 1000,
vhost: defaultURI.Vhost,
canon: "amqp://192.168.1.1:1000/",
},
{
url: "amqp://[::1]",
username: defaultURI.Username,
password: defaultURI.Password,
host: "::1",
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://[::1]/",
},
{
url: "amqp://[::1]:1000",
username: defaultURI.Username,
password: defaultURI.Password,
host: "::1",
port: 1000,
vhost: defaultURI.Vhost,
canon: "amqp://[::1]:1000/",
},
{
url: "amqp://[fe80::1]",
username: defaultURI.Username,
password: defaultURI.Password,
host: "fe80::1",
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://[fe80::1]/",
},
{
url: "amqp://[fe80::1]",
username: defaultURI.Username,
password: defaultURI.Password,
host: "fe80::1",
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://[fe80::1]/",
},
{
url: "amqp://[fe80::1%25en0]",
username: defaultURI.Username,
password: defaultURI.Password,
host: "fe80::1%en0",
port: defaultURI.Port,
vhost: defaultURI.Vhost,
canon: "amqp://[fe80::1%25en0]/",
},
{
url: "amqp://[fe80::1]:5671",
username: defaultURI.Username,
password: defaultURI.Password,
host: "fe80::1",
port: 5671,
vhost: defaultURI.Vhost,
canon: "amqp://[fe80::1]:5671/",
},
{
url: "amqps:///",
username: defaultURI.Username,
password: defaultURI.Password,
host: defaultURI.Host,
port: schemePorts["amqps"],
vhost: defaultURI.Vhost,
canon: "amqps://localhost/",
},
{
url: "amqps://host:1000/",
username: defaultURI.Username,
password: defaultURI.Password,
host: "host",
port: 1000,
vhost: defaultURI.Vhost,
canon: "amqps://host:1000/",
},
}
var _ = Describe("Parse Test ", func() {
It("ParseURI", func() {
for _, test := range uriTests {
uri, err := ParseURI(test.url)
Expect(err).To(BeNil())
Expect(uri.Username).To(Equal(test.username))
Expect(uri.Password).To(Equal(test.password))
Expect(uri.Host).To(Equal(test.host))
Expect(uri.Port).To(Equal(test.port))
Expect(uri.Vhost).To(Equal(test.vhost))
}
})
})