API consistency with .NET client

* Prefer using `IQueueSpecification` and `IExchangeSpecification` instead of `string`

For bindings, use strings for source and destination names.

Add sleep so credit is granted from RMQ (HACK)

Add minimal sleep for links to be established

* No need to have exported types and functions in `common.go`

* Simplify functions and remove `PercentCodec` type

Use `fmt.Errorf`

Update modules
This commit is contained in:
Luke Bakken 2024-09-17 09:59:33 -07:00
parent 189408c5bb
commit b9a14aaf38
No known key found for this signature in database
GPG Key ID: D99DE30E43EAE440
17 changed files with 182 additions and 190 deletions

View File

@ -48,7 +48,7 @@ func main() {
} }
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName()) fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName())
bindingSpec := management.Binding().SourceExchange(exchangeInfo.GetName()).DestinationQueue(queueInfo.GetName()).Key("routing-key") bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key")
err = bindingSpec.Bind(context.Background()) err = bindingSpec.Bind(context.Background())
if err != nil { if err != nil {

14
go.mod
View File

@ -3,20 +3,20 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client
go 1.22.0 go 1.22.0
require ( require (
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2 github.com/onsi/gomega v1.34.2
) )
require ( require (
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 // indirect
github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
github.com/google/uuid v1.6.0 // indirect golang.org/x/net v0.29.0 // indirect
golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.25.0 // indirect
golang.org/x/sys v0.24.0 // indirect golang.org/x/text v0.18.0 // indirect
golang.org/x/text v0.17.0 // indirect golang.org/x/tools v0.25.0 // indirect
golang.org/x/tools v0.24.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

22
go.sum
View File

@ -1,7 +1,5 @@
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A= github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A=
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48 h1:etxEtd7qkhJD34gpQesPbZuMJrqkc+ZOXqR3diVfGWs=
github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
@ -12,8 +10,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSFBy+X1V0o+l+8NF1avt4HWl7cA= github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ=
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4=
@ -24,14 +22,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

View File

@ -9,10 +9,11 @@ type AMQPBindingInfo struct {
} }
type AMQPBinding struct { type AMQPBinding struct {
sourceExchangeName string sourceName string
destinationQueue string destinationName string
bindingKey string toQueue bool
management *AmqpManagement bindingKey string
management *AmqpManagement
} }
func newAMQPBinding(management *AmqpManagement) *AMQPBinding { func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
@ -24,31 +25,55 @@ func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification {
return b return b
} }
func (b *AMQPBinding) SourceExchange(exchangeName string) IBindingSpecification { func (b *AMQPBinding) SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification {
b.sourceExchangeName = exchangeName b.sourceName = exchangeSpec.GetName()
b.toQueue = false
return b return b
} }
func (b *AMQPBinding) DestinationQueue(queueName string) IBindingSpecification { func (b *AMQPBinding) SourceExchangeName(exchangeName string) IBindingSpecification {
b.destinationQueue = queueName b.sourceName = exchangeName
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationExchange(exchangeSpec IExchangeInfo) IBindingSpecification {
b.destinationName = exchangeSpec.GetName()
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationExchangeName(exchangeName string) IBindingSpecification {
b.destinationName = exchangeName
b.toQueue = false
return b
}
func (b *AMQPBinding) DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification {
b.destinationName = queueSpec.GetName()
b.toQueue = true
return b
}
func (b *AMQPBinding) DestinationQueueName(queueName string) IBindingSpecification {
b.destinationName = queueName
b.toQueue = true
return b return b
} }
func (b *AMQPBinding) Bind(ctx context.Context) error { func (b *AMQPBinding) Bind(ctx context.Context) error {
path := bindingPath() path := bindingPath()
kv := make(map[string]any) kv := make(map[string]any)
kv["binding_key"] = b.bindingKey kv["binding_key"] = b.bindingKey
kv["source"] = b.sourceExchangeName kv["source"] = b.sourceName
kv["destination_queue"] = b.destinationQueue kv["destination_queue"] = b.destinationName
kv["arguments"] = make(map[string]any) kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204}) _, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
return err return err
} }
func (b *AMQPBinding) Unbind(ctx context.Context) error { func (b *AMQPBinding) Unbind(ctx context.Context) error {
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.sourceExchangeName, b.destinationQueue, b.bindingKey) bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
_, err := b.management.Request(ctx, amqp.Null{}, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204}) _, err := b.management.Request(ctx, amqp.Null{}, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204})
return err return err
} }

View File

@ -7,7 +7,6 @@ import (
) )
var _ = Describe("AMQP Bindings test ", func() { var _ = Describe("AMQP Bindings test ", func() {
var connection IConnection var connection IConnection
var management IManagement var management IManagement
BeforeEach(func() { BeforeEach(func() {
@ -26,9 +25,9 @@ var _ = Describe("AMQP Bindings test ", func() {
Expect(connection.Close(context.Background())).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil())
}) })
It("AMQP Bindings between Exchange and Queue Should success ", func() { It("AMQP Bindings between Exchange and Queue Should succeed", func() {
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success" const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue should uccess"
const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success" const queueName = "Queue_AMQP Bindings between Exchange and Queue should succeed"
exchangeSpec := management.Exchange(exchangeName) exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO()) exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
@ -41,9 +40,7 @@ var _ = Describe("AMQP Bindings test ", func() {
Expect(queueInfo).NotTo(BeNil()) Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.GetName()).To(Equal(queueName))
bindingSpec := management.Binding().SourceExchange(exchangeName). bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key")
DestinationQueue(queueName).
Key("routing-key")
err = bindingSpec.Bind(context.TODO()) err = bindingSpec.Bind(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
err = bindingSpec.Unbind(context.TODO()) err = bindingSpec.Unbind(context.TODO())
@ -52,7 +49,5 @@ var _ = Describe("AMQP Bindings test ", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
err = queueSpec.Delete(context.TODO()) err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })

View File

@ -44,7 +44,6 @@ func (c *ConnectionSettings) Port(port int) IConnectionSettings {
} }
func (c *ConnectionSettings) User(userName string) IConnectionSettings { func (c *ConnectionSettings) User(userName string) IConnectionSettings {
c.user = userName c.user = userName
return c return c
} }
@ -71,7 +70,6 @@ func (c *ConnectionSettings) GetHost() string {
func (c *ConnectionSettings) Host(hostName string) IConnectionSettings { func (c *ConnectionSettings) Host(hostName string) IConnectionSettings {
c.host = hostName c.host = hostName
return c return c
} }
func (c *ConnectionSettings) GetPort() int { func (c *ConnectionSettings) GetPort() int {
@ -170,6 +168,7 @@ func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectio
err = a.Management().Open(ctx, a) err = a.Management().Open(ctx, a)
if err != nil { if err != nil {
// TODO close connection?
return err return err
} }
return nil return nil

View File

@ -8,7 +8,7 @@ import (
) )
var _ = Describe("AMQP Connection Test", func() { var _ = Describe("AMQP Connection Test", func() {
It("AMQP SASLTypeAnonymous Connection should success", func() { It("AMQP SASLTypeAnonymous Connection should succeed", func() {
amqpConnection := NewAmqpConnection() amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@ -17,11 +17,14 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).NotTo(BeNil())
connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous}) connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous})
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())
err = amqpConnection.Close(context.Background())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP SASLTypePlain Connection should success", func() { It("AMQP SASLTypePlain Connection should succeed", func() {
amqpConnection := NewAmqpConnection() amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@ -30,7 +33,10 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.SaslMechanism(SaslMechanism{Type: Plain}) connectionSettings.SaslMechanism(SaslMechanism{Type: Plain})
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())
err = amqpConnection.Close(context.Background())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
@ -42,12 +48,12 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.Host("localhost").Port(1234) connectionSettings.Host("localhost").Port(1234)
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).NotTo(BeNil()) Expect(err).NotTo(BeNil())
}) })
It("AMQP Connection should fail due of wrong host", func() { It("AMQP Connection should fail due of wrong host", func() {
amqpConnection := NewAmqpConnection() amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
@ -56,11 +62,12 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.Host("wronghost").Port(5672) connectionSettings.Host("wronghost").Port(5672)
err := amqpConnection.Open(context.TODO(), connectionSettings)
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).NotTo(BeNil()) Expect(err).NotTo(BeNil())
}) })
It("AMQP Connection should fail due of context cancelled", func() { It("AMQP Connection should fail due to context cancellation", func() {
amqpConnection := NewAmqpConnection() amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).NotTo(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
@ -69,12 +76,12 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(err).NotTo(BeNil()) Expect(err).NotTo(BeNil())
}) })
It("AMQP Connection should receive events ", func() { It("AMQP Connection should receive events", func() {
amqpConnection := NewAmqpConnection() amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).NotTo(BeNil())
ch := make(chan *StatusChanged, 1) ch := make(chan *StatusChanged, 1)
amqpConnection.NotifyStatusChange(ch) amqpConnection.NotifyStatusChange(ch)
err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) err := amqpConnection.Open(context.Background(), NewConnectionSettings())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
recv := <-ch recv := <-ch
Expect(recv).NotTo(BeNil()) Expect(recv).NotTo(BeNil())
@ -88,7 +95,6 @@ var _ = Describe("AMQP Connection Test", func() {
Expect(recv.From).To(Equal(Open)) Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed)) Expect(recv.To).To(Equal(Closed))
}) })
//It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() { //It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() {
@ -103,8 +109,7 @@ var _ = Describe("AMQP Connection Test", func() {
// }) // })
// Expect(connectionSettings).NotTo(BeNil()) // Expect(connectionSettings).NotTo(BeNil())
// Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) // Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
// err := amqpConnection.Open(context.TODO(), connectionSettings) // err := amqpConnection.Open(context.Background(), connectionSettings)
// Expect(err).To(BeNil()) // Expect(err).To(BeNil())
//}) //})
}) })

View File

@ -34,7 +34,6 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
} }
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) { func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
path := exchangePath(e.name) path := exchangePath(e.name)
kv := make(map[string]any) kv := make(map[string]any)
kv["auto_delete"] = e.isAutoDelete kv["auto_delete"] = e.isAutoDelete

View File

@ -7,7 +7,6 @@ import (
) )
var _ = Describe("AMQP Exchange test ", func() { var _ = Describe("AMQP Exchange test ", func() {
var connection IConnection var connection IConnection
var management IManagement var management IManagement
BeforeEach(func() { BeforeEach(func() {
@ -26,8 +25,8 @@ var _ = Describe("AMQP Exchange test ", func() {
Expect(connection.Close(context.Background())).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil())
}) })
It("AMQP Exchange Declare with Default and Delete should success ", func() { It("AMQP Exchange Declare with Default and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare and Delete with Default should success" const exchangeName = "AMQP Exchange Declare and Delete with Default should succeed"
exchangeSpec := management.Exchange(exchangeName) exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO()) exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
@ -37,8 +36,8 @@ var _ = Describe("AMQP Exchange test ", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP Exchange Declare with Topic and Delete should success ", func() { It("AMQP Exchange Declare with Topic and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with Topic and Delete should success" const exchangeName = "AMQP Exchange Declare with Topic and Delete should succeed"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic}) exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic})
exchangeInfo, err := exchangeSpec.Declare(context.TODO()) exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
@ -48,8 +47,8 @@ var _ = Describe("AMQP Exchange test ", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP Exchange Declare with FanOut and Delete should success ", func() { It("AMQP Exchange Declare with FanOut and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with FanOut and Delete should success" const exchangeName = "AMQP Exchange Declare with FanOut and Delete should succeed"
exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut}) exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut})
exchangeInfo, err := exchangeSpec.Declare(context.TODO()) exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
@ -58,5 +57,4 @@ var _ = Describe("AMQP Exchange test ", func() {
err = exchangeSpec.Delete(context.TODO()) err = exchangeSpec.Delete(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })

View File

@ -10,7 +10,7 @@ import (
"time" "time"
) )
var PreconditionFailed = errors.New("precondition Failed") var ErrPreconditionFailed = errors.New("precondition Failed")
type AmqpManagement struct { type AmqpManagement struct {
session *amqp.Session session *amqp.Session
@ -58,28 +58,6 @@ func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
return nil return nil
} }
//func (a *AmqpManagement) processMessages(ctx context.Context) error {
//
// go func() {
//
// for a.GetStatus() == Open {
// msg, err := a.receiver.Receive(ctx, nil) // blocking call
// if err != nil {
// fmt.Printf("Exiting processMessages %s\n", err)
// return
// }
//
// if msg != nil {
// a.receiver.AcceptMessage(ctx, msg)
// }
// }
//
// fmt.Printf("Exiting processMessages\n")
// }()
//return nil
//}
func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error { func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error {
if a.sender == nil { if a.sender == nil {
prop := make(map[string]any) prop := make(map[string]any)
@ -110,19 +88,24 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error
if err != nil { if err != nil {
return err return err
} }
a.session = session a.session = session
err = a.ensureSenderLink(ctx) err = a.ensureSenderLink(ctx)
if err != nil { if err != nil {
return err return err
} }
time.Sleep(500 * time.Millisecond)
err = a.ensureReceiverLink(ctx) err = a.ensureReceiverLink(ctx)
time.Sleep(500 * time.Millisecond)
if err != nil { if err != nil {
return err return err
} }
// TODO
// Even 10ms is enough to allow the links to establish,
// which tells me it allows the golang runtime to process
// some channels or I/O or something elsewhere
time.Sleep(time.Millisecond * 10)
a.lifeCycle.SetStatus(Open) a.lifeCycle.SetStatus(Open)
return ctx.Err() return ctx.Err()
} }
@ -137,15 +120,12 @@ func (a *AmqpManagement) Close(ctx context.Context) error {
func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string,
expectedResponseCodes []int) (map[string]any, error) { expectedResponseCodes []int) (map[string]any, error) {
return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes) return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes)
} }
func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error { func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error {
if responseCode == responseCode409 { if responseCode == responseCode409 {
return PreconditionFailed return ErrPreconditionFailed
} }
for _, code := range expectedResponseCodes { for _, code := range expectedResponseCodes {
@ -154,7 +134,7 @@ func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponse
} }
} }
return errors.New(fmt.Sprintf("expected response code %d got %d", expectedResponseCodes, responseCode)) return fmt.Errorf("expected response code %d got %d", expectedResponseCodes, responseCode)
} }
func (a *AmqpManagement) request(ctx context.Context, id string, body any, path string, method string, func (a *AmqpManagement) request(ctx context.Context, id string, body any, path string, method string,
@ -162,6 +142,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
amqpMessage := &amqp.Message{ amqpMessage := &amqp.Message{
Value: body, Value: body,
} }
s := commandReplyTo s := commandReplyTo
amqpMessage.Properties = &amqp.MessageProperties{ amqpMessage.Properties = &amqp.MessageProperties{
ReplyTo: &s, ReplyTo: &s,
@ -169,19 +150,24 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
Subject: &method, Subject: &method,
MessageID: &id, MessageID: &id,
} }
opts := &amqp.SendOptions{Settled: true} opts := &amqp.SendOptions{Settled: true}
err := a.sender.Send(ctx, amqpMessage, opts) err := a.sender.Send(ctx, amqpMessage, opts)
if err != nil { if err != nil {
return make(map[string]any), err return make(map[string]any), err
} }
msg, err := a.receiver.Receive(ctx, nil) msg, err := a.receiver.Receive(ctx, nil)
if err != nil { if err != nil {
return make(map[string]any), err return make(map[string]any), err
} }
err = a.receiver.AcceptMessage(ctx, msg) err = a.receiver.AcceptMessage(ctx, msg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if msg.Properties == nil { if msg.Properties == nil {
return make(map[string]any), fmt.Errorf("expected properties in the message") return make(map[string]any), fmt.Errorf("expected properties in the message")
} }
@ -193,6 +179,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
if msg.Properties.CorrelationID != id { if msg.Properties.CorrelationID != id {
return make(map[string]any), fmt.Errorf("expected correlation id %s got %s", id, msg.Properties.CorrelationID) return make(map[string]any), fmt.Errorf("expected correlation id %s got %s", id, msg.Properties.CorrelationID)
} }
switch msg.Value.(type) { switch msg.Value.(type) {
case map[string]interface{}: case map[string]interface{}:
return msg.Value.(map[string]any), nil return msg.Value.(map[string]any), nil

View File

@ -8,8 +8,7 @@ import (
) )
var _ = Describe("Management tests", func() { var _ = Describe("Management tests", func() {
It("AMQP Management should fail due to context cancellation", func() {
It("AMQP Management should fail due of context cancelled", func() {
amqpConnection := NewAmqpConnection() amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).NotTo(BeNil())
err := amqpConnection.Open(context.Background(), NewConnectionSettings()) err := amqpConnection.Open(context.Background(), NewConnectionSettings())
@ -19,14 +18,15 @@ var _ = Describe("Management tests", func() {
cancel() cancel()
err = amqpConnection.Management().Open(ctx, amqpConnection) err = amqpConnection.Management().Open(ctx, amqpConnection)
Expect(err).NotTo(BeNil()) Expect(err).NotTo(BeNil())
amqpConnection.Close(context.Background())
}) })
It("AMQP Management should receive events ", func() { It("AMQP Management should receive events", func() {
amqpConnection := NewAmqpConnection() amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).NotTo(BeNil())
ch := make(chan *StatusChanged, 1) ch := make(chan *StatusChanged, 1)
amqpConnection.Management().NotifyStatusChange(ch) amqpConnection.Management().NotifyStatusChange(ch)
err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) err := amqpConnection.Open(context.Background(), NewConnectionSettings())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
recv := <-ch recv := <-ch
Expect(recv).NotTo(BeNil()) Expect(recv).NotTo(BeNil())
@ -40,7 +40,7 @@ var _ = Describe("Management tests", func() {
Expect(recv.From).To(Equal(Open)) Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed)) Expect(recv.To).To(Equal(Closed))
amqpConnection.Close(context.Background())
}) })
It("Request", func() { It("Request", func() {
@ -51,7 +51,7 @@ var _ = Describe("Management tests", func() {
connectionSettings := NewConnectionSettings() connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.TODO(), connectionSettings) err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
management := amqpConnection.Management() management := amqpConnection.Management()
@ -62,9 +62,10 @@ var _ = Describe("Management tests", func() {
_queueArguments["x-queue-type"] = "quorum" _queueArguments["x-queue-type"] = "quorum"
kv["arguments"] = _queueArguments kv["arguments"] = _queueArguments
path := "/queues/test" path := "/queues/test"
result, err := management.Request(context.TODO(), kv, path, "PUT", []int{200}) result, err := management.Request(context.Background(), kv, path, "PUT", []int{200})
Expect(err).To(BeNil()) Expect(err).To(BeNil())
Expect(result).NotTo(BeNil()) Expect(result).NotTo(BeNil())
Expect(management.Close(context.TODO())).To(BeNil()) Expect(management.Close(context.Background())).To(BeNil())
amqpConnection.Close(context.Background())
}) })
}) })

View File

@ -45,7 +45,7 @@ func (a *AmqpQueueInfo) IsAutoDelete() bool {
return a.isAutoDelete return a.isAutoDelete
} }
func (a *AmqpQueueInfo) Exclusive() bool { func (a *AmqpQueueInfo) IsExclusive() bool {
return a.isExclusive return a.isExclusive
} }
@ -121,9 +121,7 @@ func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecificat
} }
func (a *AmqpQueue) validate() error { func (a *AmqpQueue) validate() error {
if a.arguments["max-length-bytes"] != nil { if a.arguments["max-length-bytes"] != nil {
err := validatePositive("max length", a.arguments["max-length-bytes"].(int64)) err := validatePositive("max length", a.arguments["max-length-bytes"].(int64))
if err != nil { if err != nil {
return err return err
@ -133,18 +131,18 @@ func (a *AmqpQueue) validate() error {
} }
func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) {
if Quorum == a.GetQueueType() || if Quorum == a.GetQueueType() ||
Stream == a.GetQueueType() { Stream == a.GetQueueType() {
// mandatory arguments for quorum queues and streams // mandatory arguments for quorum queues and streams
a.Exclusive(false).AutoDelete(false) a.Exclusive(false).AutoDelete(false)
} }
if err := a.validate(); err != nil { if err := a.validate(); err != nil {
return nil, err return nil, err
} }
if a.name == "" { if a.name == "" {
a.name = GenerateNameWithDefaultPrefix() a.name = generateNameWithDefaultPrefix()
} }
path := queuePath(a.name) path := queuePath(a.name)

View File

@ -7,7 +7,6 @@ import (
) )
var _ = Describe("AMQP Queue test ", func() { var _ = Describe("AMQP Queue test ", func() {
var connection IConnection var connection IConnection
var management IManagement var management IManagement
BeforeEach(func() { BeforeEach(func() {
@ -27,8 +26,8 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(connection.Close(context.Background())).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil())
}) })
It("AMQP Queue Declare With Response and Delete should success ", func() { It("AMQP Queue Declare With Response and Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Response and Delete should success" const queueName = "AMQP Queue Declare With Response and Delete should succeed"
queueSpec := management.Queue(queueName) queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO()) queueInfo, err := queueSpec.Declare(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
@ -36,14 +35,14 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.Exclusive()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Classic)) Expect(queueInfo.Type()).To(Equal(Classic))
err = queueSpec.Delete(context.TODO()) err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP Queue Declare With Parameters and Delete should success ", func() { It("AMQP Queue Declare With Parameters and Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Parameters and Delete should success" const queueName = "AMQP Queue Declare With Parameters and Delete should succeed"
queueSpec := management.Queue(queueName).Exclusive(true). queueSpec := management.Queue(queueName).Exclusive(true).
AutoDelete(true). AutoDelete(true).
QueueType(QueueType{Classic}). QueueType(QueueType{Classic}).
@ -56,7 +55,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeTrue())
Expect(queueInfo.Exclusive()).To(BeTrue()) Expect(queueInfo.IsExclusive()).To(BeTrue())
Expect(queueInfo.Type()).To(Equal(Classic)) Expect(queueInfo.Type()).To(Equal(Classic))
Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit")) Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit"))
Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0)) Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0))
@ -69,8 +68,8 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP Declare Quorum Queue and Delete should success ", func() { It("AMQP Declare Quorum Queue and Delete should succeed", func() {
const queueName = "AMQP Declare Quorum Queue and Delete should success" const queueName = "AMQP Declare Quorum Queue and Delete should succeed"
// Quorum queue will ignore Exclusive and AutoDelete settings // Quorum queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues // since they are not supported by quorum queues
queueSpec := management.Queue(queueName). queueSpec := management.Queue(queueName).
@ -82,14 +81,14 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.Exclusive()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Quorum)) Expect(queueInfo.Type()).To(Equal(Quorum))
err = queueSpec.Delete(context.TODO()) err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP Declare Stream Queue and Delete should success ", func() { It("AMQP Declare Stream Queue and Delete should succeed", func() {
const queueName = "AMQP Declare Stream Queue and Delete should success" const queueName = "AMQP Declare Stream Queue and Delete should succeed"
// Stream queue will ignore Exclusive and AutoDelete settings // Stream queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues // since they are not supported by quorum queues
queueSpec := management.Queue(queueName). queueSpec := management.Queue(queueName).
@ -101,13 +100,13 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.GetName()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.Exclusive()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Stream)) Expect(queueInfo.Type()).To(Equal(Stream))
err = queueSpec.Delete(context.TODO()) err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP Declare Queue with invalid type should fail ", func() { It("AMQP Declare Queue with invalid type should fail", func() {
const queueName = "AMQP Declare Queue with invalid type should fail" const queueName = "AMQP Declare Queue with invalid type should fail"
queueSpec := management.Queue(queueName). queueSpec := management.Queue(queueName).
QueueType(QueueType{Type: "invalid"}) QueueType(QueueType{Type: "invalid"})
@ -115,8 +114,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).NotTo(BeNil()) Expect(err).NotTo(BeNil())
}) })
It("AMQP Declare Queue should fail with Precondition fail ", func() { It("AMQP Declare Queue should fail with Precondition fail", func() {
// The first queue is declared as Classic and it should succeed // The first queue is declared as Classic and it should succeed
// The second queue is declared as Quorum and it should fail since it is already declared as Classic // The second queue is declared as Quorum and it should fail since it is already declared as Classic
const queueName = "AMQP Declare Queue should fail with Precondition fail" const queueName = "AMQP Declare Queue should fail with Precondition fail"
@ -126,13 +124,12 @@ var _ = Describe("AMQP Queue test ", func() {
queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum}) queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum})
_, err = queueSpecFail.Declare(context.TODO()) _, err = queueSpecFail.Declare(context.TODO())
Expect(err).NotTo(BeNil()) Expect(err).NotTo(BeNil())
Expect(err).To(Equal(PreconditionFailed)) Expect(err).To(Equal(ErrPreconditionFailed))
err = queueSpec.Delete(context.TODO()) err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
It("AMQP Declare Queue should fail during validation", func() { It("AMQP Declare Queue should fail during validation", func() {
const queueName = "AMQP Declare Queue should fail during validation" const queueName = "AMQP Declare Queue should fail during validation"
queueSpec := management.Queue(queueName).MaxLengthBytes(-1) queueSpec := management.Queue(queueName).MaxLengthBytes(-1)
_, err := queueSpec.Declare(context.TODO()) _, err := queueSpec.Declare(context.TODO())
@ -149,5 +146,4 @@ var _ = Describe("AMQP Queue test ", func() {
err = queueSpec.Delete(context.TODO()) err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })

View File

@ -9,10 +9,26 @@ import (
"strings" "strings"
) )
type PercentCodec struct{} const (
responseCode200 = 200
responseCode201 = 201
responseCode204 = 204
responseCode409 = 409
commandPut = "PUT"
commandGet = "GET"
commandPost = "POST"
commandDelete = "DELETE"
commandReplyTo = "$me"
managementNodeAddress = "/management"
linkPairName = "management-link-pair"
exchanges = "exchanges"
key = "key"
queues = "queues"
bindings = "bindings"
)
// Encode takes a string and returns its percent-encoded representation. // encodePathSegments takes a string and returns its percent-encoded representation.
func (pc *PercentCodec) Encode(input string) string { func encodePathSegments(input string) string {
var encoded strings.Builder var encoded strings.Builder
// Iterate over each character in the input string // Iterate over each character in the input string
@ -30,7 +46,7 @@ func (pc *PercentCodec) Encode(input string) string {
} }
// Decode takes a percent-encoded string and returns its decoded representation. // Decode takes a percent-encoded string and returns its decoded representation.
func (pc *PercentCodec) Decode(input string) (string, error) { func decode(input string) (string, error) {
// Use url.QueryUnescape which properly decodes percent-encoded strings // Use url.QueryUnescape which properly decodes percent-encoded strings
decoded, err := url.QueryUnescape(input) decoded, err := url.QueryUnescape(input)
if err != nil { if err != nil {
@ -40,27 +56,6 @@ func (pc *PercentCodec) Decode(input string) (string, error) {
return decoded, nil return decoded, nil
} }
const (
responseCode200 = 200
responseCode201 = 201
responseCode204 = 204
responseCode409 = 409
commandPut = "PUT"
commandGet = "GET"
commandPost = "POST"
commandDelete = "DELETE"
commandReplyTo = "$me"
managementNodeAddress = "/management"
linkPairName = "management-link-pair"
)
const (
Exchanges = "exchanges"
Key = "key"
Queues = "queues"
Bindings = "bindings"
)
// isUnreserved checks if a character is an unreserved character in percent encoding // isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ // Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
func isUnreserved(char rune) bool { func isUnreserved(char rune) bool {
@ -70,27 +65,28 @@ func isUnreserved(char rune) bool {
char == '-' || char == '.' || char == '_' || char == '~' char == '-' || char == '.' || char == '_' || char == '~'
} }
func encodePathSegments(pathSegments string) string {
return (&PercentCodec{}).Encode(pathSegments)
}
func queuePath(queueName string) string { func queuePath(queueName string) string {
return "/" + Queues + "/" + encodePathSegments(queueName) return "/" + queues + "/" + encodePathSegments(queueName)
} }
func exchangePath(exchangeName string) string { func exchangePath(exchangeName string) string {
return "/" + Exchanges + "/" + encodePathSegments(exchangeName) return "/" + exchanges + "/" + encodePathSegments(exchangeName)
} }
func bindingPath() string { func bindingPath() string {
return "/" + Bindings return "/" + bindings
} }
func bindingPathWithExchangeQueueKey(exchangeName, queueName, key string) string { func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string {
//string path = sourceNameEncoded := encodePathSegments(sourceName)
//$"/{Consts.Bindings}/src={Utils.EncodePathSegment(_sourceName)};{($"{destinationCharacter}={Utils.EncodePathSegment(_destinationName)};key={Utils.EncodePathSegment(_routingKey)};args=")}"; destinationNameEncoded := encodePathSegments(destinationName)
keyEncoded := encodePathSegments(key)
return fmt.Sprintf("/%s/src=%s;dstq=%s;key=%s;args=", Bindings, encodePathSegments(exchangeName), encodePathSegments(queueName), encodePathSegments(key)) destinationType := "dste"
if toQueue {
destinationType = "dstq"
}
format := "/%s/src=%s;%s=%s;key=%s;args="
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
} }
@ -101,21 +97,19 @@ func validatePositive(label string, value int64) error {
return nil return nil
} }
func GenerateNameWithDefaultPrefix() string { func generateNameWithDefaultPrefix() string {
return GenerateName("client.gen-") return generateName("client.gen-")
} }
// GenerateName generates a unique name with the given prefix // generateName generates a unique name with the given prefix
func GenerateName(prefix string) string { func generateName(prefix string) string {
uid := uuid.New()
var uid = uuid.New() uuidBytes := []byte(uid.String())
var uuidBytes = []byte(uid.String()) md5obj := md5.New()
var _md5 = md5.New() digest := md5obj.Sum(uuidBytes)
var digest = _md5.Sum(uuidBytes)
result := base64.StdEncoding.EncodeToString(digest) result := base64.StdEncoding.EncodeToString(digest)
result = strings.ReplaceAll(result, "+", "-") result = strings.ReplaceAll(result, "+", "-")
result = strings.ReplaceAll(result, "/", "_") result = strings.ReplaceAll(result, "/", "_")
result = strings.ReplaceAll(result, "=", "") result = strings.ReplaceAll(result, "=", "")
return prefix + result return prefix + result
} }

View File

@ -1,7 +1,6 @@
package rabbitmq_amqp package rabbitmq_amqp
import ( import (
"errors"
"fmt" "fmt"
"regexp" "regexp"
"strconv" "strconv"
@ -47,7 +46,7 @@ func CapacityFrom(value string) (int64, error) {
match, err := regexp.Compile("^((kb|mb|gb|tb))") match, err := regexp.Compile("^((kb|mb|gb|tb))")
if err != nil { if err != nil {
return 0, return 0,
errors.New(fmt.Sprintf("Capacity, invalid unit size format:%s", value)) fmt.Errorf("Capacity, invalid unit size format:%s", value)
} }
foundUnitSize := strings.ToLower(value[len(value)-2:]) foundUnitSize := strings.ToLower(value[len(value)-2:])
@ -56,7 +55,7 @@ func CapacityFrom(value string) (int64, error) {
size, err := strconv.Atoi(value[:len(value)-2]) size, err := strconv.Atoi(value[:len(value)-2])
if err != nil { if err != nil {
return 0, errors.New(fmt.Sprintf("Capacity, Invalid number format: %s", value)) return 0, fmt.Errorf("Capacity, Invalid number format: %s", value)
} }
switch foundUnitSize { switch foundUnitSize {
case UnitKb: case UnitKb:
@ -71,9 +70,7 @@ func CapacityFrom(value string) (int64, error) {
case UnitTb: case UnitTb:
return CapacityTB(int64(size)), nil return CapacityTB(int64(size)), nil
} }
} }
return 0, return 0, fmt.Errorf("Capacity, Invalid unit size format: %s", value)
errors.New(fmt.Sprintf("Capacity, Invalid unit size format: %s", value))
} }

View File

@ -7,8 +7,7 @@ import (
) )
var _ = Describe("Converters", func() { var _ = Describe("Converters", func() {
It("Converts from number", func() {
It("Converter from number", func() {
Expect(CapacityBytes(100)).To(Equal(int64(100))) Expect(CapacityBytes(100)).To(Equal(int64(100)))
Expect(CapacityKB(1)).To(Equal(int64(1000))) Expect(CapacityKB(1)).To(Equal(int64(1000)))
Expect(CapacityMB(1)).To(Equal(int64(1000 * 1000))) Expect(CapacityMB(1)).To(Equal(int64(1000 * 1000)))
@ -16,7 +15,7 @@ var _ = Describe("Converters", func() {
Expect(CapacityTB(1)).To(Equal(int64(1000 * 1000 * 1000 * 1000))) Expect(CapacityTB(1)).To(Equal(int64(1000 * 1000 * 1000 * 1000)))
}) })
It("Converter from string", func() { It("Converts from string", func() {
v, err := CapacityFrom("1KB") v, err := CapacityFrom("1KB")
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal(int64(1000))) Expect(v).To(Equal(int64(1000)))
@ -34,7 +33,7 @@ var _ = Describe("Converters", func() {
Expect(v).To(Equal(int64(1000 * 1000 * 1000 * 1000))) Expect(v).To(Equal(int64(1000 * 1000 * 1000 * 1000)))
}) })
It("Converter from string logError", func() { It("Converts from string logError", func() {
v, err := CapacityFrom("10LL") v, err := CapacityFrom("10LL")
Expect(fmt.Sprintf("%s", err)). Expect(fmt.Sprintf("%s", err)).
To(ContainSubstring("Invalid unit size format")) To(ContainSubstring("Invalid unit size format"))
@ -51,5 +50,4 @@ var _ = Describe("Converters", func() {
Expect(v).To(Equal(int64(0))) Expect(v).To(Equal(int64(0)))
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })

View File

@ -46,7 +46,7 @@ type IQueueInfo interface {
GetName() string GetName() string
IsDurable() bool IsDurable() bool
IsAutoDelete() bool IsAutoDelete() bool
Exclusive() bool IsExclusive() bool
Type() TQueueType Type() TQueueType
GetLeader() string GetLeader() string
GetReplicas() []string GetReplicas() []string
@ -86,8 +86,10 @@ type IExchangeSpecification interface {
} }
type IBindingSpecification interface { type IBindingSpecification interface {
SourceExchange(exchangeName string) IBindingSpecification SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification
DestinationQueue(queueName string) IBindingSpecification SourceExchangeName(exchangeName string) IBindingSpecification
DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification
DestinationQueueName(queueName string) IBindingSpecification
Key(bindingKey string) IBindingSpecification Key(bindingKey string) IBindingSpecification
Bind(ctx context.Context) error Bind(ctx context.Context) error
Unbind(ctx context.Context) error Unbind(ctx context.Context) error