Merge pull request #12 from rabbitmq/rabbitmq-amqp-go-client-3

API consistency with .NET client
This commit is contained in:
Luke Bakken 2024-09-24 07:17:49 -07:00 committed by GitHub
commit 6d8aaeb2b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 182 additions and 190 deletions

View File

@ -48,7 +48,7 @@ func main() {
}
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName())
bindingSpec := management.Binding().SourceExchange(exchangeInfo.GetName()).DestinationQueue(queueInfo.GetName()).Key("routing-key")
bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key")
err = bindingSpec.Bind(context.Background())
if err != nil {

14
go.mod
View File

@ -3,20 +3,20 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client
go 1.22.0
require (
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
)
require (
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect
github.com/google/uuid v1.6.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/tools v0.24.0 // indirect
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.25.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

22
go.sum
View File

@ -1,7 +1,5 @@
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A=
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48 h1:etxEtd7qkhJD34gpQesPbZuMJrqkc+ZOXqR3diVfGWs=
github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
@ -12,8 +10,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSFBy+X1V0o+l+8NF1avt4HWl7cA=
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ=
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4=
@ -24,14 +22,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

View File

@ -9,10 +9,11 @@ type AMQPBindingInfo struct {
}
type AMQPBinding struct {
sourceExchangeName string
destinationQueue string
bindingKey string
management *AmqpManagement
sourceName string
destinationName string
toQueue bool
bindingKey string
management *AmqpManagement
}
func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
@ -24,31 +25,55 @@ func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification {
return b
}
func (b *AMQPBinding) SourceExchange(exchangeName string) IBindingSpecification {
b.sourceExchangeName = exchangeName
func (b *AMQPBinding) SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification {
b.sourceName = exchangeSpec.GetName()
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationQueue(queueName string) IBindingSpecification {
b.destinationQueue = queueName
func (b *AMQPBinding) SourceExchangeName(exchangeName string) IBindingSpecification {
b.sourceName = exchangeName
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationExchange(exchangeSpec IExchangeInfo) IBindingSpecification {
b.destinationName = exchangeSpec.GetName()
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationExchangeName(exchangeName string) IBindingSpecification {
b.destinationName = exchangeName
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification {
b.destinationName = queueSpec.GetName()
b.toQueue = true
return b
}
func (b *AMQPBinding) DestinationQueueName(queueName string) IBindingSpecification {
b.destinationName = queueName
b.toQueue = true
return b
}
func (b *AMQPBinding) Bind(ctx context.Context) error {
path := bindingPath()
kv := make(map[string]any)
kv["binding_key"] = b.bindingKey
kv["source"] = b.sourceExchangeName
kv["destination_queue"] = b.destinationQueue
kv["source"] = b.sourceName
kv["destination_queue"] = b.destinationName
kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
return err
}
func (b *AMQPBinding) Unbind(ctx context.Context) error {
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.sourceExchangeName, b.destinationQueue, b.bindingKey)
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
_, err := b.management.Request(ctx, amqp.Null{}, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204})
return err
}

View File

@ -7,7 +7,6 @@ import (
)
var _ = Describe("AMQP Bindings test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
@ -26,9 +25,9 @@ var _ = Describe("AMQP Bindings test ", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Bindings between Exchange and Queue Should success ", func() {
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success"
const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success"
It("AMQP Bindings between Exchange and Queue Should succeed", func() {
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue should uccess"
const queueName = "Queue_AMQP Bindings between Exchange and Queue should succeed"
exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
@ -41,9 +40,7 @@ var _ = Describe("AMQP Bindings test ", func() {
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))
bindingSpec := management.Binding().SourceExchange(exchangeName).
DestinationQueue(queueName).
Key("routing-key")
bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key")
err = bindingSpec.Bind(context.TODO())
Expect(err).To(BeNil())
err = bindingSpec.Unbind(context.TODO())
@ -52,7 +49,5 @@ var _ = Describe("AMQP Bindings test ", func() {
Expect(err).To(BeNil())
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
})

View File

@ -44,7 +44,6 @@ func (c *ConnectionSettings) Port(port int) IConnectionSettings {
}
func (c *ConnectionSettings) User(userName string) IConnectionSettings {
c.user = userName
return c
}
@ -71,7 +70,6 @@ func (c *ConnectionSettings) GetHost() string {
func (c *ConnectionSettings) Host(hostName string) IConnectionSettings {
c.host = hostName
return c
}
func (c *ConnectionSettings) GetPort() int {
@ -170,6 +168,7 @@ func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectio
err = a.Management().Open(ctx, a)
if err != nil {
// TODO close connection?
return err
}
return nil

View File

@ -8,7 +8,7 @@ import (
)
var _ = Describe("AMQP Connection Test", func() {
It("AMQP SASLTypeAnonymous Connection should success", func() {
It("AMQP SASLTypeAnonymous Connection should succeed", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@ -17,11 +17,14 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil())
connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous})
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())
err = amqpConnection.Close(context.Background())
Expect(err).To(BeNil())
})
It("AMQP SASLTypePlain Connection should success", func() {
It("AMQP SASLTypePlain Connection should succeed", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@ -30,7 +33,10 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.SaslMechanism(SaslMechanism{Type: Plain})
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())
err = amqpConnection.Close(context.Background())
Expect(err).To(BeNil())
})
@ -42,12 +48,12 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.Host("localhost").Port(1234)
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fail due of wrong host", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@ -56,11 +62,12 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.Host("wronghost").Port(5672)
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should fail due of context cancelled", func() {
It("AMQP Connection should fail due to context cancellation", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
@ -69,12 +76,12 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(err).NotTo(BeNil())
})
It("AMQP Connection should receive events ", func() {
It("AMQP Connection should receive events", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
ch := make(chan *StatusChanged, 1)
amqpConnection.NotifyStatusChange(ch)
err := amqpConnection.Open(context.TODO(), NewConnectionSettings())
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
Expect(err).To(BeNil())
recv := <-ch
Expect(recv).NotTo(BeNil())
@ -88,7 +95,6 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
})
//It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() {
@ -103,8 +109,7 @@ var _ = Describe("AMQP Connection Test", func() {
// })
// Expect(connectionSettings).NotTo(BeNil())
// Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
// err := amqpConnection.Open(context.TODO(), connectionSettings)
// err := amqpConnection.Open(context.Background(), connectionSettings)
// Expect(err).To(BeNil())
//})
})

View File

@ -34,7 +34,6 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
}
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
path := exchangePath(e.name)
kv := make(map[string]any)
kv["auto_delete"] = e.isAutoDelete

View File

@ -7,7 +7,6 @@ import (
)
var _ = Describe("AMQP Exchange test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
@ -26,8 +25,8 @@ var _ = Describe("AMQP Exchange test ", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Exchange Declare with Default and Delete should success ", func() {
const exchangeName = "AMQP Exchange Declare and Delete with Default should success"
It("AMQP Exchange Declare with Default and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare and Delete with Default should succeed"
exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
@ -37,8 +36,8 @@ var _ = Describe("AMQP Exchange test ", func() {
Expect(err).To(BeNil())
})
It("AMQP Exchange Declare with Topic and Delete should success ", func() {
const exchangeName = "AMQP Exchange Declare with Topic and Delete should success"
It("AMQP Exchange Declare with Topic and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with Topic and Delete should succeed"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic})
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
@ -48,8 +47,8 @@ var _ = Describe("AMQP Exchange test ", func() {
Expect(err).To(BeNil())
})
It("AMQP Exchange Declare with FanOut and Delete should success ", func() {
const exchangeName = "AMQP Exchange Declare with FanOut and Delete should success"
It("AMQP Exchange Declare with FanOut and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with FanOut and Delete should succeed"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut})
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
@ -58,5 +57,4 @@ var _ = Describe("AMQP Exchange test ", func() {
err = exchangeSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
})

View File

@ -10,7 +10,7 @@ import (
"time"
)
var PreconditionFailed = errors.New("precondition Failed")
var ErrPreconditionFailed = errors.New("precondition Failed")
type AmqpManagement struct {
session *amqp.Session
@ -58,28 +58,6 @@ func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
return nil
}
//func (a *AmqpManagement) processMessages(ctx context.Context) error {
//
// go func() {
//
// for a.GetStatus() == Open {
// msg, err := a.receiver.Receive(ctx, nil) // blocking call
// if err != nil {
// fmt.Printf("Exiting processMessages %s\n", err)
// return
// }
//
// if msg != nil {
// a.receiver.AcceptMessage(ctx, msg)
// }
// }
//
// fmt.Printf("Exiting processMessages\n")
// }()
//return nil
//}
func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error {
if a.sender == nil {
prop := make(map[string]any)
@ -110,19 +88,24 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error
if err != nil {
return err
}
a.session = session
err = a.ensureSenderLink(ctx)
if err != nil {
return err
}
time.Sleep(500 * time.Millisecond)
err = a.ensureReceiverLink(ctx)
time.Sleep(500 * time.Millisecond)
if err != nil {
return err
}
// TODO
// Even 10ms is enough to allow the links to establish,
// which tells me it allows the golang runtime to process
// some channels or I/O or something elsewhere
time.Sleep(time.Millisecond * 10)
a.lifeCycle.SetStatus(Open)
return ctx.Err()
}
@ -137,15 +120,12 @@ func (a *AmqpManagement) Close(ctx context.Context) error {
func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string,
expectedResponseCodes []int) (map[string]any, error) {
return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes)
}
func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error {
if responseCode == responseCode409 {
return PreconditionFailed
return ErrPreconditionFailed
}
for _, code := range expectedResponseCodes {
@ -154,7 +134,7 @@ func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponse
}
}
return errors.New(fmt.Sprintf("expected response code %d got %d", expectedResponseCodes, responseCode))
return fmt.Errorf("expected response code %d got %d", expectedResponseCodes, responseCode)
}
func (a *AmqpManagement) request(ctx context.Context, id string, body any, path string, method string,
@ -162,6 +142,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
amqpMessage := &amqp.Message{
Value: body,
}
s := commandReplyTo
amqpMessage.Properties = &amqp.MessageProperties{
ReplyTo: &s,
@ -169,19 +150,24 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
Subject: &method,
MessageID: &id,
}
opts := &amqp.SendOptions{Settled: true}
err := a.sender.Send(ctx, amqpMessage, opts)
if err != nil {
return make(map[string]any), err
}
msg, err := a.receiver.Receive(ctx, nil)
if err != nil {
return make(map[string]any), err
}
err = a.receiver.AcceptMessage(ctx, msg)
if err != nil {
return nil, err
}
if msg.Properties == nil {
return make(map[string]any), fmt.Errorf("expected properties in the message")
}
@ -193,6 +179,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
if msg.Properties.CorrelationID != id {
return make(map[string]any), fmt.Errorf("expected correlation id %s got %s", id, msg.Properties.CorrelationID)
}
switch msg.Value.(type) {
case map[string]interface{}:
return msg.Value.(map[string]any), nil

View File

@ -8,8 +8,7 @@ import (
)
var _ = Describe("Management tests", func() {
It("AMQP Management should fail due of context cancelled", func() {
It("AMQP Management should fail due to context cancellation", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
@ -19,14 +18,15 @@ var _ = Describe("Management tests", func() {
cancel()
err = amqpConnection.Management().Open(ctx, amqpConnection)
Expect(err).NotTo(BeNil())
amqpConnection.Close(context.Background())
})
It("AMQP Management should receive events ", func() {
It("AMQP Management should receive events", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
ch := make(chan *StatusChanged, 1)
amqpConnection.Management().NotifyStatusChange(ch)
err := amqpConnection.Open(context.TODO(), NewConnectionSettings())
err := amqpConnection.Open(context.Background(), NewConnectionSettings())
Expect(err).To(BeNil())
recv := <-ch
Expect(recv).NotTo(BeNil())
@ -40,7 +40,7 @@ var _ = Describe("Management tests", func() {
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
amqpConnection.Close(context.Background())
})
It("Request", func() {
@ -51,7 +51,7 @@ var _ = Describe("Management tests", func() {
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())
management := amqpConnection.Management()
@ -62,9 +62,10 @@ var _ = Describe("Management tests", func() {
_queueArguments["x-queue-type"] = "quorum"
kv["arguments"] = _queueArguments
path := "/queues/test"
result, err := management.Request(context.TODO(), kv, path, "PUT", []int{200})
result, err := management.Request(context.Background(), kv, path, "PUT", []int{200})
Expect(err).To(BeNil())
Expect(result).NotTo(BeNil())
Expect(management.Close(context.TODO())).To(BeNil())
Expect(management.Close(context.Background())).To(BeNil())
amqpConnection.Close(context.Background())
})
})

View File

@ -45,7 +45,7 @@ func (a *AmqpQueueInfo) IsAutoDelete() bool {
return a.isAutoDelete
}
func (a *AmqpQueueInfo) Exclusive() bool {
func (a *AmqpQueueInfo) IsExclusive() bool {
return a.isExclusive
}
@ -121,9 +121,7 @@ func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecificat
}
func (a *AmqpQueue) validate() error {
if a.arguments["max-length-bytes"] != nil {
err := validatePositive("max length", a.arguments["max-length-bytes"].(int64))
if err != nil {
return err
@ -133,18 +131,18 @@ func (a *AmqpQueue) validate() error {
}
func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
if Quorum == a.GetQueueType() ||
Stream == a.GetQueueType() {
// mandatory arguments for quorum queues and streams
a.Exclusive(false).AutoDelete(false)
}
if err := a.validate(); err != nil {
return nil, err
}
if a.name == "" {
a.name = GenerateNameWithDefaultPrefix()
a.name = generateNameWithDefaultPrefix()
}
path := queuePath(a.name)

View File

@ -7,7 +7,6 @@ import (
)
var _ = Describe("AMQP Queue test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
@ -27,8 +26,8 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Queue Declare With Response and Delete should success ", func() {
const queueName = "AMQP Queue Declare With Response and Delete should success"
It("AMQP Queue Declare With Response and Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Response and Delete should succeed"
queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO())
Expect(err).To(BeNil())
@ -36,14 +35,14 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.Exclusive()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Classic))
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
It("AMQP Queue Declare With Parameters and Delete should success ", func() {
const queueName = "AMQP Queue Declare With Parameters and Delete should success"
It("AMQP Queue Declare With Parameters and Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Parameters and Delete should succeed"
queueSpec := management.Queue(queueName).Exclusive(true).
AutoDelete(true).
QueueType(QueueType{Classic}).
@ -56,7 +55,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeTrue())
Expect(queueInfo.Exclusive()).To(BeTrue())
Expect(queueInfo.IsExclusive()).To(BeTrue())
Expect(queueInfo.Type()).To(Equal(Classic))
Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit"))
Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0))
@ -69,8 +68,8 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).To(BeNil())
})
It("AMQP Declare Quorum Queue and Delete should success ", func() {
const queueName = "AMQP Declare Quorum Queue and Delete should success"
It("AMQP Declare Quorum Queue and Delete should succeed", func() {
const queueName = "AMQP Declare Quorum Queue and Delete should succeed"
// Quorum queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
queueSpec := management.Queue(queueName).
@ -82,14 +81,14 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.Exclusive()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Quorum))
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
It("AMQP Declare Stream Queue and Delete should success ", func() {
const queueName = "AMQP Declare Stream Queue and Delete should success"
It("AMQP Declare Stream Queue and Delete should succeed", func() {
const queueName = "AMQP Declare Stream Queue and Delete should succeed"
// Stream queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
queueSpec := management.Queue(queueName).
@ -101,13 +100,13 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.Exclusive()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Stream))
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
It("AMQP Declare Queue with invalid type should fail ", func() {
It("AMQP Declare Queue with invalid type should fail", func() {
const queueName = "AMQP Declare Queue with invalid type should fail"
queueSpec := management.Queue(queueName).
QueueType(QueueType{Type: "invalid"})
@ -115,8 +114,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).NotTo(BeNil())
})
It("AMQP Declare Queue should fail with Precondition fail ", func() {
It("AMQP Declare Queue should fail with Precondition fail", func() {
// The first queue is declared as Classic and it should succeed
// The second queue is declared as Quorum and it should fail since it is already declared as Classic
const queueName = "AMQP Declare Queue should fail with Precondition fail"
@ -126,13 +124,12 @@ var _ = Describe("AMQP Queue test ", func() {
queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum})
_, err = queueSpecFail.Declare(context.TODO())
Expect(err).NotTo(BeNil())
Expect(err).To(Equal(PreconditionFailed))
Expect(err).To(Equal(ErrPreconditionFailed))
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
It("AMQP Declare Queue should fail during validation", func() {
const queueName = "AMQP Declare Queue should fail during validation"
queueSpec := management.Queue(queueName).MaxLengthBytes(-1)
_, err := queueSpec.Declare(context.TODO())
@ -149,5 +146,4 @@ var _ = Describe("AMQP Queue test ", func() {
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
})

View File

@ -9,10 +9,26 @@ import (
"strings"
)
type PercentCodec struct{}
const (
responseCode200 = 200
responseCode201 = 201
responseCode204 = 204
responseCode409 = 409
commandPut = "PUT"
commandGet = "GET"
commandPost = "POST"
commandDelete = "DELETE"
commandReplyTo = "$me"
managementNodeAddress = "/management"
linkPairName = "management-link-pair"
exchanges = "exchanges"
key = "key"
queues = "queues"
bindings = "bindings"
)
// Encode takes a string and returns its percent-encoded representation.
func (pc *PercentCodec) Encode(input string) string {
// encodePathSegments takes a string and returns its percent-encoded representation.
func encodePathSegments(input string) string {
var encoded strings.Builder
// Iterate over each character in the input string
@ -30,7 +46,7 @@ func (pc *PercentCodec) Encode(input string) string {
}
// Decode takes a percent-encoded string and returns its decoded representation.
func (pc *PercentCodec) Decode(input string) (string, error) {
func decode(input string) (string, error) {
// Use url.QueryUnescape which properly decodes percent-encoded strings
decoded, err := url.QueryUnescape(input)
if err != nil {
@ -40,27 +56,6 @@ func (pc *PercentCodec) Decode(input string) (string, error) {
return decoded, nil
}
const (
responseCode200 = 200
responseCode201 = 201
responseCode204 = 204
responseCode409 = 409
commandPut = "PUT"
commandGet = "GET"
commandPost = "POST"
commandDelete = "DELETE"
commandReplyTo = "$me"
managementNodeAddress = "/management"
linkPairName = "management-link-pair"
)
const (
Exchanges = "exchanges"
Key = "key"
Queues = "queues"
Bindings = "bindings"
)
// isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
func isUnreserved(char rune) bool {
@ -70,27 +65,28 @@ func isUnreserved(char rune) bool {
char == '-' || char == '.' || char == '_' || char == '~'
}
func encodePathSegments(pathSegments string) string {
return (&PercentCodec{}).Encode(pathSegments)
}
func queuePath(queueName string) string {
return "/" + Queues + "/" + encodePathSegments(queueName)
return "/" + queues + "/" + encodePathSegments(queueName)
}
func exchangePath(exchangeName string) string {
return "/" + Exchanges + "/" + encodePathSegments(exchangeName)
return "/" + exchanges + "/" + encodePathSegments(exchangeName)
}
func bindingPath() string {
return "/" + Bindings
return "/" + bindings
}
func bindingPathWithExchangeQueueKey(exchangeName, queueName, key string) string {
//string path =
//$"/{Consts.Bindings}/src={Utils.EncodePathSegment(_sourceName)};{($"{destinationCharacter}={Utils.EncodePathSegment(_destinationName)};key={Utils.EncodePathSegment(_routingKey)};args=")}";
return fmt.Sprintf("/%s/src=%s;dstq=%s;key=%s;args=", Bindings, encodePathSegments(exchangeName), encodePathSegments(queueName), encodePathSegments(key))
func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string {
sourceNameEncoded := encodePathSegments(sourceName)
destinationNameEncoded := encodePathSegments(destinationName)
keyEncoded := encodePathSegments(key)
destinationType := "dste"
if toQueue {
destinationType = "dstq"
}
format := "/%s/src=%s;%s=%s;key=%s;args="
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
}
@ -101,21 +97,19 @@ func validatePositive(label string, value int64) error {
return nil
}
func GenerateNameWithDefaultPrefix() string {
return GenerateName("client.gen-")
func generateNameWithDefaultPrefix() string {
return generateName("client.gen-")
}
// GenerateName generates a unique name with the given prefix
func GenerateName(prefix string) string {
var uid = uuid.New()
var uuidBytes = []byte(uid.String())
var _md5 = md5.New()
var digest = _md5.Sum(uuidBytes)
// generateName generates a unique name with the given prefix
func generateName(prefix string) string {
uid := uuid.New()
uuidBytes := []byte(uid.String())
md5obj := md5.New()
digest := md5obj.Sum(uuidBytes)
result := base64.StdEncoding.EncodeToString(digest)
result = strings.ReplaceAll(result, "+", "-")
result = strings.ReplaceAll(result, "/", "_")
result = strings.ReplaceAll(result, "=", "")
return prefix + result
}

View File

@ -1,7 +1,6 @@
package rabbitmq_amqp
import (
"errors"
"fmt"
"regexp"
"strconv"
@ -47,7 +46,7 @@ func CapacityFrom(value string) (int64, error) {
match, err := regexp.Compile("^((kb|mb|gb|tb))")
if err != nil {
return 0,
errors.New(fmt.Sprintf("Capacity, invalid unit size format:%s", value))
fmt.Errorf("Capacity, invalid unit size format:%s", value)
}
foundUnitSize := strings.ToLower(value[len(value)-2:])
@ -56,7 +55,7 @@ func CapacityFrom(value string) (int64, error) {
size, err := strconv.Atoi(value[:len(value)-2])
if err != nil {
return 0, errors.New(fmt.Sprintf("Capacity, Invalid number format: %s", value))
return 0, fmt.Errorf("Capacity, Invalid number format: %s", value)
}
switch foundUnitSize {
case UnitKb:
@ -71,9 +70,7 @@ func CapacityFrom(value string) (int64, error) {
case UnitTb:
return CapacityTB(int64(size)), nil
}
}
return 0,
errors.New(fmt.Sprintf("Capacity, Invalid unit size format: %s", value))
return 0, fmt.Errorf("Capacity, Invalid unit size format: %s", value)
}

View File

@ -7,8 +7,7 @@ import (
)
var _ = Describe("Converters", func() {
It("Converter from number", func() {
It("Converts from number", func() {
Expect(CapacityBytes(100)).To(Equal(int64(100)))
Expect(CapacityKB(1)).To(Equal(int64(1000)))
Expect(CapacityMB(1)).To(Equal(int64(1000 * 1000)))
@ -16,7 +15,7 @@ var _ = Describe("Converters", func() {
Expect(CapacityTB(1)).To(Equal(int64(1000 * 1000 * 1000 * 1000)))
})
It("Converter from string", func() {
It("Converts from string", func() {
v, err := CapacityFrom("1KB")
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal(int64(1000)))
@ -34,7 +33,7 @@ var _ = Describe("Converters", func() {
Expect(v).To(Equal(int64(1000 * 1000 * 1000 * 1000)))
})
It("Converter from string logError", func() {
It("Converts from string logError", func() {
v, err := CapacityFrom("10LL")
Expect(fmt.Sprintf("%s", err)).
To(ContainSubstring("Invalid unit size format"))
@ -51,5 +50,4 @@ var _ = Describe("Converters", func() {
Expect(v).To(Equal(int64(0)))
Expect(err).To(BeNil())
})
})

View File

@ -46,7 +46,7 @@ type IQueueInfo interface {
GetName() string
IsDurable() bool
IsAutoDelete() bool
Exclusive() bool
IsExclusive() bool
Type() TQueueType
GetLeader() string
GetReplicas() []string
@ -86,8 +86,10 @@ type IExchangeSpecification interface {
}
type IBindingSpecification interface {
SourceExchange(exchangeName string) IBindingSpecification
DestinationQueue(queueName string) IBindingSpecification
SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification
SourceExchangeName(exchangeName string) IBindingSpecification
DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification
DestinationQueueName(queueName string) IBindingSpecification
Key(bindingKey string) IBindingSpecification
Bind(ctx context.Context) error
Unbind(ctx context.Context) error