First implementation for Exchanges and bindings (#2)

* First implementation for  Exchanges and bindings
---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2024-09-11 14:42:05 +02:00 committed by GitHub
parent fa7d5d9413
commit 05f7cd9fbb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 436 additions and 65 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt"
mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"os"
"time"
)
func main() {
@ -30,28 +31,64 @@ func main() {
management := amqpConnection.Management()
queueSpec := management.Queue("getting_started_queue").
QueueType(mq.QueueType{Type: mq.Quorum}).
MaxLengthBytes(mq.CapacityGB(1)).
DeadLetterExchange("dead-letter-exchange").
DeadLetterRoutingKey("dead-letter-routing-key")
MaxLengthBytes(mq.CapacityGB(1))
exchangeSpec := management.Exchange("getting_started_exchange").
ExchangeType(mq.ExchangeType{Type: mq.Topic})
queueInfo, err := queueSpec.Declare(context.Background())
if err != nil {
fmt.Printf("Error declaring queue %s\n", err)
return
}
fmt.Printf("Queue %s created.\n", queueInfo.GetName())
exchangeInfo, err := exchangeSpec.Declare(context.Background())
if err != nil {
fmt.Printf("Error declaring exchange %s\n", err)
return
}
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName())
bindingSpec := management.Binding().SourceExchange(exchangeInfo.GetName()).DestinationQueue(queueInfo.GetName()).Key("routing-key")
err = bindingSpec.Bind(context.Background())
if err != nil {
fmt.Printf("Error binding %s\n", err)
return
}
fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName())
fmt.Println("Press any key to cleanup and exit")
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadString('\n')
err = bindingSpec.Unbind(context.Background())
if err != nil {
fmt.Printf("Error unbinding %s\n", err)
return
}
fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName())
err = exchangeSpec.Delete(context.Background())
if err != nil {
fmt.Printf("Error deleting exchange %s\n", err)
return
}
err = queueSpec.Delete(context.Background())
if err != nil {
return
}
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName())
fmt.Println("Press any key to stop ")
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadString('\n')
err = amqpConnection.Close(context.Background())
if err != nil {
return
}
fmt.Printf("AMQP Connection closed.\n")
// Wait for the status change to be printed
time.Sleep(500 * time.Millisecond)
close(chStatusChanged)
}

View File

@ -0,0 +1,51 @@
package rabbitmq_amqp
import "context"
type AMQPBindingInfo struct {
}
type AMQPBinding struct {
sourceExchangeName string
destinationQueue string
bindingKey string
management *AmqpManagement
}
func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
return &AMQPBinding{management: management}
}
func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification {
b.bindingKey = bindingKey
return b
}
func (b *AMQPBinding) SourceExchange(exchangeName string) IBindingSpecification {
b.sourceExchangeName = exchangeName
return b
}
func (b *AMQPBinding) DestinationQueue(queueName string) IBindingSpecification {
b.destinationQueue = queueName
return b
}
func (b *AMQPBinding) Bind(ctx context.Context) error {
path := bindingPath()
kv := make(map[string]any)
kv["binding_key"] = b.bindingKey
kv["source"] = b.sourceExchangeName
kv["destination_queue"] = b.destinationQueue
kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
return err
}
func (b *AMQPBinding) Unbind(ctx context.Context) error {
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.sourceExchangeName, b.destinationQueue, b.bindingKey)
_, err := b.management.Request(ctx, nil, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204})
return err
}

View File

@ -0,0 +1,58 @@
package rabbitmq_amqp
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("AMQP Bindings test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
connection = NewAmqpConnection()
Expect(connection).NotTo(BeNil())
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := connection.Open(context.TODO(), connectionSettings)
Expect(err).To(BeNil())
management = connection.Management()
})
AfterEach(func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Bindings between Exchange and Queue Should success ", func() {
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success"
const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success"
exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO())
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))
bindingSpec := management.Binding().SourceExchange(exchangeName).
DestinationQueue(queueName).
Key("routing-key")
err = bindingSpec.Bind(context.TODO())
Expect(err).To(BeNil())
err = bindingSpec.Unbind(context.TODO())
Expect(err).To(BeNil())
err = exchangeSpec.Delete(context.TODO())
Expect(err).To(BeNil())
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
})

View File

@ -8,15 +8,25 @@ import (
)
type ConnectionSettings struct {
host string
port int
user string
password string
virtualHost string
scheme string
containerId string
useSsl bool
tlsConfig *tls.Config
host string
port int
user string
password string
virtualHost string
scheme string
containerId string
useSsl bool
tlsConfig *tls.Config
saslMechanism TSaslMechanism
}
func (c *ConnectionSettings) GetSaslMechanism() TSaslMechanism {
return c.saslMechanism
}
func (c *ConnectionSettings) SaslMechanism(mechanism SaslMechanism) IConnectionSettings {
c.saslMechanism = mechanism.Type
return c
}
func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings {
@ -138,8 +148,13 @@ func NewAmqpConnection() IConnection {
}
func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error {
// TODO: add support for other SASL types
sASLType := amqp.SASLTypeAnonymous()
switch connectionSettings.GetSaslMechanism() {
case Plain:
sASLType = amqp.SASLTypePlain(connectionSettings.GetUser(), connectionSettings.GetPassword())
case External:
sASLType = amqp.SASLTypeExternal("")
}
conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{
ContainerID: connectionSettings.GetContainerId(),

View File

@ -8,7 +8,20 @@ import (
)
var _ = Describe("AMQP Connection Test", func() {
It("AMQP Connection should success", func() {
It("AMQP SASLTypeAnonymous Connection should success", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous})
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.TODO(), connectionSettings)
Expect(err).To(BeNil())
})
It("AMQP SASLTypePlain Connection should success", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@ -16,6 +29,7 @@ var _ = Describe("AMQP Connection Test", func() {
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.SaslMechanism(SaslMechanism{Type: Plain})
err := amqpConnection.Open(context.TODO(), connectionSettings)
Expect(err).To(BeNil())
})

View File

@ -0,0 +1,74 @@
package rabbitmq_amqp
import "context"
type AmqpExchangeInfo struct {
name string
}
func newAmqpExchangeInfo(name string) IExchangeInfo {
return &AmqpExchangeInfo{name: name}
}
func (a *AmqpExchangeInfo) GetName() string {
return a.name
}
type AmqpExchange struct {
name string
management *AmqpManagement
arguments map[string]any
isAutoDelete bool
exchangeType ExchangeType
}
func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
return &AmqpExchange{management: management,
name: name,
arguments: make(map[string]any),
exchangeType: ExchangeType{Type: Direct},
}
}
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
path := exchangePath(e.name)
kv := make(map[string]any)
kv["auto_delete"] = e.isAutoDelete
kv["durable"] = true
kv["type"] = e.exchangeType.String()
kv["arguments"] = e.arguments
_, err := e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409})
if err != nil {
return nil, err
}
return newAmqpExchangeInfo(e.name), nil
}
func (e *AmqpExchange) AutoDelete(isAutoDelete bool) IExchangeSpecification {
e.isAutoDelete = isAutoDelete
return e
}
func (e *AmqpExchange) IsAutoDelete() bool {
return e.isAutoDelete
}
func (e *AmqpExchange) Delete(ctx context.Context) error {
path := exchangePath(e.name)
_, err := e.management.Request(ctx, nil, path, commandDelete, []int{responseCode204})
return err
}
func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) IExchangeSpecification {
e.exchangeType = exchangeType
return e
}
func (e *AmqpExchange) GetExchangeType() TExchangeType {
return e.exchangeType.Type
}
func (e *AmqpExchange) GetName() string {
return e.name
}

View File

@ -0,0 +1,62 @@
package rabbitmq_amqp
import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("AMQP Exchange test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
connection = NewAmqpConnection()
Expect(connection).NotTo(BeNil())
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := connection.Open(context.TODO(), connectionSettings)
Expect(err).To(BeNil())
management = connection.Management()
})
AfterEach(func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Exchange Declare with Default and Delete should success ", func() {
const exchangeName = "AMQP Exchange Declare and Delete with Default should success"
exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
err = exchangeSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
It("AMQP Exchange Declare with Topic and Delete should success ", func() {
const exchangeName = "AMQP Exchange Declare with Topic and Delete should success"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic})
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
err = exchangeSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
It("AMQP Exchange Declare with FanOut and Delete should success ", func() {
const exchangeName = "AMQP Exchange Declare with FanOut and Delete should success"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut})
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
err = exchangeSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
})

View File

@ -20,11 +20,18 @@ type AmqpManagement struct {
cancel context.CancelFunc
}
func (a *AmqpManagement) Binding() IBindingSpecification {
return newAMQPBinding(a)
}
func (a *AmqpManagement) Exchange(exchangeName string) IExchangeSpecification {
return newAmqpExchange(a, exchangeName)
}
func NewAmqpManagement() *AmqpManagement {
return &AmqpManagement{
lifeCycle: NewLifeCycle(),
}
}
func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
@ -116,17 +123,6 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error
if err != nil {
return err
}
//if ctx.Err() != nil {
// // start processing messages. Here we pass a context that will be closed
// // when the receiver session is closed.
// // we won't expose To the user since the user will call Close
// // and the processing _must_ be running in the background
// // for the management session life.
// //err = a.processMessages(context.Background())
// //if err != nil {
// // return err
// //}
//}
a.lifeCycle.SetStatus(Open)
return ctx.Err()
}
@ -148,11 +144,16 @@ func (a *AmqpManagement) Request(ctx context.Context, body any, path string, met
func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error {
if responseCode == responseCode409 {
return PreconditionFailed
}
for _, code := range expectedResponseCodes {
if code == responseCode {
return nil
}
}
return PreconditionFailed
}

View File

@ -61,38 +61,38 @@ func (a *AmqpQueueInfo) GetArguments() map[string]any {
}
type AmqpQueue struct {
management *AmqpManagement
queueArguments map[string]any
isExclusive bool
isAutoDelete bool
name string
management *AmqpManagement
arguments map[string]any
isExclusive bool
isAutoDelete bool
name string
}
func (a *AmqpQueue) DeadLetterExchange(dlx string) IQueueSpecification {
a.queueArguments["x-dead-letter-exchange"] = dlx
a.arguments["x-dead-letter-exchange"] = dlx
return a
}
func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) IQueueSpecification {
a.queueArguments["x-dead-letter-routing-key"] = dlrk
a.arguments["x-dead-letter-routing-key"] = dlrk
return a
}
func (a *AmqpQueue) MaxLengthBytes(length int64) IQueueSpecification {
a.queueArguments["max-length-bytes"] = length
a.arguments["max-length-bytes"] = length
return a
}
func (a *AmqpQueue) QueueType(queueType QueueType) IQueueSpecification {
a.queueArguments["x-queue-type"] = queueType.String()
a.arguments["x-queue-type"] = queueType.String()
return a
}
func (a *AmqpQueue) GetQueueType() TQueueType {
if a.queueArguments["x-queue-type"] == nil {
if a.arguments["x-queue-type"] == nil {
return Classic
}
return TQueueType(a.queueArguments["x-queue-type"].(string))
return TQueueType(a.arguments["x-queue-type"].(string))
}
func (a *AmqpQueue) Exclusive(isExclusive bool) IQueueSpecification {
@ -115,15 +115,15 @@ func (a *AmqpQueue) IsAutoDelete() bool {
func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecification {
return &AmqpQueue{management: management,
name: queueName,
queueArguments: make(map[string]any)}
name: queueName,
arguments: make(map[string]any)}
}
func (a *AmqpQueue) validate() error {
if a.queueArguments["max-length-bytes"] != nil {
if a.arguments["max-length-bytes"] != nil {
err := validatePositive("max length", a.queueArguments["max-length-bytes"].(int64))
err := validatePositive("max length", a.arguments["max-length-bytes"].(int64))
if err != nil {
return err
}
@ -151,8 +151,8 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
kv["durable"] = true
kv["auto_delete"] = a.isAutoDelete
kv["exclusive"] = a.isExclusive
kv["arguments"] = a.queueArguments
response, err := a.management.Request(ctx, kv, path, commandPut, []int{200})
kv["arguments"] = a.arguments
response, err := a.management.Request(ctx, kv, path, commandPut, []int{responseCode200, responseCode409})
if err != nil {
return nil, err
}
@ -161,11 +161,8 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
func (a *AmqpQueue) Delete(ctx context.Context) error {
path := queuePath(a.name)
_, err := a.management.Request(ctx, nil, path, commandDelete, []int{200})
if err != nil {
return err
}
return nil
_, err := a.management.Request(ctx, nil, path, commandDelete, []int{responseCode200})
return err
}
func (a *AmqpQueue) Name(queueName string) IQueueSpecification {

View File

@ -78,6 +78,22 @@ func queuePath(queueName string) string {
return "/" + Queues + "/" + encodePathSegments(queueName)
}
func exchangePath(exchangeName string) string {
return "/" + Exchanges + "/" + encodePathSegments(exchangeName)
}
func bindingPath() string {
return "/" + Bindings
}
func bindingPathWithExchangeQueueKey(exchangeName, queueName, key string) string {
//string path =
//$"/{Consts.Bindings}/src={Utils.EncodePathSegment(_sourceName)};{($"{destinationCharacter}={Utils.EncodePathSegment(_destinationName)};key={Utils.EncodePathSegment(_routingKey)};args=")}";
return fmt.Sprintf("/%s/src=%s;dstq=%s;key=%s;args=", Bindings, encodePathSegments(exchangeName), encodePathSegments(queueName), encodePathSegments(key))
}
func validatePositive(label string, value int64) error {
if value < 0 {
return fmt.Errorf("value for %s must be positive, got %d", label, value)
@ -85,18 +101,6 @@ func validatePositive(label string, value int64) error {
return nil
}
//internal static string GenerateName(string prefix)
//{
//string uuidStr = Guid.NewGuid().ToString();
//byte[] uuidBytes = Encoding.ASCII.GetBytes(uuidStr);
//var md5 = MD5.Create();
//byte[] digest = md5.ComputeHash(uuidBytes);
//return prefix + Convert.ToBase64String(digest)
//.Replace('+', '-')
//.Replace('/', '_')
//.Replace("=", "");
//}
func GenerateNameWithDefaultPrefix() string {
return GenerateName("client.gen-")
}

View File

@ -5,6 +5,18 @@ import (
"crypto/tls"
)
type TSaslMechanism string
const (
Plain TSaslMechanism = "plain"
External TSaslMechanism = "external"
Anonymous TSaslMechanism = "anonymous"
)
type SaslMechanism struct {
Type TSaslMechanism
}
type IConnectionSettings interface {
GetHost() string
Host(hostName string) IConnectionSettings
@ -24,6 +36,8 @@ type IConnectionSettings interface {
BuildAddress() string
TlsConfig(config *tls.Config) IConnectionSettings
GetTlsConfig() *tls.Config
GetSaslMechanism() TSaslMechanism
SaslMechanism(mechanism SaslMechanism) IConnectionSettings
}
type IConnection interface {

View File

@ -34,12 +34,14 @@ type IQueueSpecification interface {
IEntityInfoSpecification[IQueueInfo]
QueueType(queueType QueueType) IQueueSpecification
GetQueueType() TQueueType
MaxLengthBytes(length int64) IQueueSpecification
DeadLetterExchange(dlx string) IQueueSpecification
DeadLetterRoutingKey(dlrk string) IQueueSpecification
}
// IQueueInfo represents the information of a queue
// It is returned by the Declare method of IQueueSpecification
// The information come from the server
type IQueueInfo interface {
GetName() string
IsDurable() bool
@ -50,3 +52,43 @@ type IQueueInfo interface {
GetReplicas() []string
GetArguments() map[string]any
}
type TExchangeType string
const (
Direct TExchangeType = "direct"
Topic TExchangeType = "topic"
FanOut TExchangeType = "fanout"
)
type ExchangeType struct {
Type TExchangeType
}
func (e ExchangeType) String() string {
return string(e.Type)
}
// IExchangeInfo represents the information of an exchange
// It is empty at the moment because the server does not return any information
// We leave it here for future use. In case the server returns information about an exchange
type IExchangeInfo interface {
GetName() string
}
type IExchangeSpecification interface {
GetName() string
AutoDelete(isAutoDelete bool) IExchangeSpecification
IsAutoDelete() bool
IEntityInfoSpecification[IExchangeInfo]
ExchangeType(exchangeType ExchangeType) IExchangeSpecification
GetExchangeType() TExchangeType
}
type IBindingSpecification interface {
SourceExchange(exchangeName string) IBindingSpecification
DestinationQueue(queueName string) IBindingSpecification
Key(bindingKey string) IBindingSpecification
Bind(ctx context.Context) error
Unbind(ctx context.Context) error
}

View File

@ -8,6 +8,8 @@ type IManagement interface {
Open(ctx context.Context, connection IConnection) error
Close(ctx context.Context) error
Queue(queueName string) IQueueSpecification
Exchange(exchangeName string) IExchangeSpecification
Binding() IBindingSpecification
QueueClientName() IQueueSpecification
GetStatus() int
NotifyStatusChange(channel chan *StatusChanged)