diff --git a/Makefile b/Makefile index 2fec76d..fd70c2c 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,10 @@ +# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) +ifeq (,$(shell go env GOBIN)) +GOBIN = $(shell go env GOPATH)/bin +else +GOBIN = $(shell go env GOBIN) +endif + all: test format: @@ -6,7 +13,18 @@ format: vet: go vet ./pkg/rabbitmqamqp -test: format vet +STATICCHECK ?= $(GOBIN)/staticcheck +STATICCHECK_VERSION ?= latest +$(STATICCHECK): + go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION) +check: $(STATICCHECK) + $(STATICCHECK) ./pkg/rabbitmqamqp + + + + + +test: format vet check cd ./pkg/rabbitmqamqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \ --randomize-all --randomize-suites \ --cover --coverprofile=coverage.txt --covermode=atomic \ diff --git a/pkg/rabbitmqamqp/address.go b/pkg/rabbitmqamqp/address.go index 6c08c38..d2744c1 100644 --- a/pkg/rabbitmqamqp/address.go +++ b/pkg/rabbitmqamqp/address.go @@ -61,9 +61,9 @@ func address(exchange, key, queue *string, urlParameters *string) (string, error if !isStringNilOrEmpty(exchange) { if !isStringNilOrEmpty(key) { - return "/" + exchanges + "/" + encodePathSegments(*exchange) + "/" + encodePathSegments(*key) + urlAppend, nil + return fmt.Sprintf("/%s/%s/%s%s", exchanges, encodePathSegments(*exchange), encodePathSegments(*key), urlAppend), nil } - return "/" + exchanges + "/" + encodePathSegments(*exchange) + urlAppend, nil + return fmt.Sprintf("/%s/%s%s", exchanges, encodePathSegments(*exchange), urlAppend), nil } if queue == nil { @@ -73,8 +73,7 @@ func address(exchange, key, queue *string, urlParameters *string) (string, error if isStringNilOrEmpty(queue) { return "", errors.New("queue must be set") } - - return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil + return fmt.Sprintf("/%s/%s%s", queues, encodePathSegments(*queue), urlAppend), nil } // exchangeAddress Creates the address for the exchange diff --git a/pkg/rabbitmqamqp/address_test.go b/pkg/rabbitmqamqp/address_test.go index 4a0f114..193c469 100644 --- a/pkg/rabbitmqamqp/address_test.go +++ b/pkg/rabbitmqamqp/address_test.go @@ -6,59 +6,62 @@ import ( ) var _ = Describe("address builder test ", func() { - It("With exchange, queue and key should raise and error", func() { - queue := "my_queue" - exchange := "my_exchange" - - _, err := address(&exchange, nil, &queue, nil) - Expect(err).NotTo(BeNil()) - Expect(err.Error()).To(Equal("exchange and queue cannot be set together")) + // Error cases + Describe("Error cases", func() { + DescribeTable("should return appropriate errors", + func(exchange, key, queue *string, expectedErr string) { + _, err := address(exchange, key, queue, nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal(expectedErr)) + }, + Entry("when both exchange and queue are set", + stringPtr("my_exchange"), nil, stringPtr("my_queue"), + "exchange and queue cannot be set together"), + Entry("when neither exchange nor queue is set", + nil, nil, nil, + "exchange or queue must be set"), + ) }) - It("Without exchange and queue should raise and error", func() { - _, err := address(nil, nil, nil, nil) - Expect(err).NotTo(BeNil()) - Expect(err.Error()).To(Equal("exchange or queue must be set")) + // Exchange-related cases + Describe("Exchange addresses", func() { + DescribeTable("should generate correct exchange addresses", + func(exchange, key *string, expected string) { + address, err := address(exchange, key, nil, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(address).To(Equal(expected)) + }, + Entry("with exchange and key", + stringPtr("my_exchange"), stringPtr("my_key"), + "/exchanges/my_exchange/my_key"), + Entry("with exchange only", + stringPtr("my_exchange"), nil, + "/exchanges/my_exchange"), + Entry("with special characters", + stringPtr("my_ exchange/()"), stringPtr("my_key "), + "/exchanges/my_%20exchange%2F%28%29/my_key%20"), + ) }) - It("With exchange and key should return address", func() { - exchange := "my_exchange" - key := "my_key" + // Queue-related cases + Describe("Queue addresses", func() { + It("should generate correct queue address with special characters", func() { + queue := "my_queue>" + address, err := address(nil, nil, &queue, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(address).To(Equal("/queues/my_queue%3E")) + }) - address, err := address(&exchange, &key, nil, nil) - Expect(err).To(BeNil()) - Expect(address).To(Equal("/exchanges/my_exchange/my_key")) + It("should generate correct purge queue address", func() { + queue := "my_queue" + address, err := purgeQueueAddress(&queue) + Expect(err).NotTo(HaveOccurred()) + Expect(address).To(Equal("/queues/my_queue/messages")) + }) }) - - It("With exchange should return address", func() { - exchange := "my_exchange" - address, err := address(&exchange, nil, nil, nil) - Expect(err).To(BeNil()) - Expect(address).To(Equal("/exchanges/my_exchange")) - }) - - It("With exchange and key with names to encode should return the encoded address", func() { - - exchange := "my_ exchange/()" - key := "my_key " - - address, err := address(&exchange, &key, nil, nil) - Expect(err).To(BeNil()) - Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20")) - }) - - It("With queue should return address", func() { - queue := "my_queue>" - address, err := address(nil, nil, &queue, nil) - Expect(err).To(BeNil()) - Expect(address).To(Equal("/queues/my_queue%3E")) - }) - - It("With queue and urlParameters should return address", func() { - queue := "my_queue" - address, err := purgeQueueAddress(&queue) - Expect(err).To(BeNil()) - Expect(address).To(Equal("/queues/my_queue/messages")) - }) - }) + +// Helper function to create string pointers +func stringPtr(s string) *string { + return &s +} diff --git a/pkg/rabbitmqamqp/amqp_connection.go b/pkg/rabbitmqamqp/amqp_connection.go index a3f1056..59a05c6 100644 --- a/pkg/rabbitmqamqp/amqp_connection.go +++ b/pkg/rabbitmqamqp/amqp_connection.go @@ -228,19 +228,18 @@ func (a *AmqpConnection) open(ctx context.Context, addresses []string, connOptio var err error a.session, err = a.azureConnection.NewSession(ctx, nil) go func() { - select { - case <-azureConnection.Done(): - { - a.lifeCycle.SetState(&StateClosed{error: azureConnection.Err()}) - if azureConnection.Err() != nil { - Error("connection closed unexpectedly", "error", azureConnection.Err()) - a.maybeReconnect() + <-azureConnection.Done() + { + a.lifeCycle.SetState(&StateClosed{error: azureConnection.Err()}) + if azureConnection.Err() != nil { + Error("connection closed unexpectedly", "error", azureConnection.Err()) + a.maybeReconnect() - return - } - Debug("connection closed successfully") + return } + Debug("connection closed successfully") } + }() if err != nil { @@ -261,67 +260,74 @@ func (a *AmqpConnection) maybeReconnect() { return } a.lifeCycle.SetState(&StateReconnecting{}) - numberOfAttempts := 1 - waitTime := a.amqpConnOptions.RecoveryConfiguration.BackOffReconnectInterval - reconnected := false - for numberOfAttempts <= a.amqpConnOptions.RecoveryConfiguration.MaxReconnectAttempts { + // Add exponential backoff with jitter + baseDelay := a.amqpConnOptions.RecoveryConfiguration.BackOffReconnectInterval + maxDelay := 1 * time.Minute + + for attempt := 1; attempt <= a.amqpConnOptions.RecoveryConfiguration.MaxReconnectAttempts; attempt++ { + ///wait for before reconnecting // add some random milliseconds to the wait time to avoid thundering herd // the random time is between 0 and 500 milliseconds - waitTime = waitTime + time.Duration(rand.Intn(500))*time.Millisecond + // Calculate delay with exponential backoff and jitter + jitter := time.Duration(rand.Intn(500)) * time.Millisecond + delay := baseDelay + jitter + if delay > maxDelay { + delay = maxDelay + } - Info("Waiting before reconnecting", "in", waitTime, "attempt", numberOfAttempts) - time.Sleep(waitTime) + Info("Attempting reconnection", "attempt", attempt, "delay", delay) + time.Sleep(delay) // context with timeout ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) // try to createSender err := a.open(ctx, a.amqpConnOptions.addresses, a.amqpConnOptions) cancel() - if err != nil { - numberOfAttempts++ - waitTime = waitTime * 2 - Error("Failed to connection. ", "id", a.Id(), "error", err) - } else { - reconnected = true - break + if err == nil { + a.restartEntities() + a.lifeCycle.SetState(&StateOpen{}) + return } + baseDelay *= 2 + Error("Reconnection attempt failed", "attempt", attempt, "error", err) } - if reconnected { - var fails int32 - Info("Reconnected successfully, restarting publishers and consumers") - a.entitiesTracker.publishers.Range(func(key, value any) bool { - publisher := value.(*Publisher) - // try to createSender - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - err := publisher.createSender(ctx) - if err != nil { - atomic.AddInt32(&fails, 1) - Error("Failed to createSender publisher", "ID", publisher.Id(), "error", err) - } - cancel() - return true - }) - Info("Restarted publishers", "number of fails", fails) - fails = 0 - a.entitiesTracker.consumers.Range(func(key, value any) bool { - consumer := value.(*Consumer) - // try to createSender - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - err := consumer.createReceiver(ctx) - if err != nil { - atomic.AddInt32(&fails, 1) - Error("Failed to createReceiver consumer", "ID", consumer.Id(), "error", err) - } - cancel() - return true - }) - Info("Restarted consumers", "number of fails", fails) +} - a.lifeCycle.SetState(&StateOpen{}) - } +// restartEntities attempts to restart all publishers and consumers after a reconnection +func (a *AmqpConnection) restartEntities() { + var publisherFails, consumerFails int32 + // Restart publishers + a.entitiesTracker.publishers.Range(func(key, value any) bool { + publisher := value.(*Publisher) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := publisher.createSender(ctx); err != nil { + atomic.AddInt32(&publisherFails, 1) + Error("Failed to restart publisher", "ID", publisher.Id(), "error", err) + } + return true + }) + + // Restart consumers + a.entitiesTracker.consumers.Range(func(key, value any) bool { + consumer := value.(*Consumer) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + if err := consumer.createReceiver(ctx); err != nil { + atomic.AddInt32(&consumerFails, 1) + Error("Failed to restart consumer", "ID", consumer.Id(), "error", err) + } + return true + }) + + Info("Entity restart complete", + "publisherFails", publisherFails, + "consumerFails", consumerFails) } func (a *AmqpConnection) close() { diff --git a/pkg/rabbitmqamqp/amqp_connection_recovery.go b/pkg/rabbitmqamqp/amqp_connection_recovery.go index ae22678..c427359 100644 --- a/pkg/rabbitmqamqp/amqp_connection_recovery.go +++ b/pkg/rabbitmqamqp/amqp_connection_recovery.go @@ -53,14 +53,6 @@ func (e *entitiesTracker) storeOrReplaceProducer(entity entityIdentifier) { e.publishers.Store(entity.Id(), entity) } -func (e *entitiesTracker) getProducer(id string) (*Publisher, bool) { - producer, ok := e.publishers.Load(id) - if !ok { - return nil, false - } - return producer.(*Publisher), true -} - func (e *entitiesTracker) removeProducer(entity entityIdentifier) { e.publishers.Delete(entity.Id()) } @@ -69,14 +61,6 @@ func (e *entitiesTracker) storeOrReplaceConsumer(entity entityIdentifier) { e.consumers.Store(entity.Id(), entity) } -func (e *entitiesTracker) getConsumer(id string) (*Consumer, bool) { - consumer, ok := e.consumers.Load(id) - if !ok { - return nil, false - } - return consumer.(*Consumer), true -} - func (e *entitiesTracker) removeConsumer(entity entityIdentifier) { e.consumers.Delete(entity.Id()) } diff --git a/pkg/rabbitmqamqp/amqp_connection_recovery_test.go b/pkg/rabbitmqamqp/amqp_connection_recovery_test.go index 067de7b..f0ac8e9 100644 --- a/pkg/rabbitmqamqp/amqp_connection_recovery_test.go +++ b/pkg/rabbitmqamqp/amqp_connection_recovery_test.go @@ -2,11 +2,12 @@ package rabbitmqamqp import ( "context" + "time" + "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" testhelper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper" - "time" ) var _ = Describe("Recovery connection test", func() { @@ -45,6 +46,7 @@ var _ = Describe("Recovery connection test", func() { consumer, err := connection.NewConsumer(context.Background(), qName, nil) + Expect(err).To(BeNil()) publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{ Queue: qName, @@ -113,6 +115,23 @@ var _ = Describe("Recovery connection test", func() { // from reconnecting to open // from open to closed (without error) Expect(err).To(BeNil()) + + Expect(consumer.Close(context.Background())).NotTo(BeNil()) + Expect(publisher.Close(context.Background())).NotTo(BeNil()) + + entLen := 0 + connection.entitiesTracker.consumers.Range(func(key, value interface{}) bool { + entLen++ + return true + }) + Expect(entLen).To(Equal(0)) + + entLen = 0 + connection.entitiesTracker.publishers.Range(func(key, value interface{}) bool { + entLen++ + return true + }) + Expect(entLen).To(Equal(0)) }) It("connection should not reconnect producers and consumers if the auto-recovery is disabled", func() { diff --git a/pkg/rabbitmqamqp/amqp_connection_test.go b/pkg/rabbitmqamqp/amqp_connection_test.go index 5659bb7..7dfef05 100644 --- a/pkg/rabbitmqamqp/amqp_connection_test.go +++ b/pkg/rabbitmqamqp/amqp_connection_test.go @@ -10,7 +10,6 @@ import ( var _ = Describe("AMQP connection Test", func() { It("AMQP SASLTypeAnonymous connection should succeed", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous()}) Expect(err).To(BeNil()) @@ -72,6 +71,64 @@ var _ = Describe("AMQP connection Test", func() { Expect(recv.To).To(Equal(&StateClosed{})) }) + It("Entity tracker should be aligned with consumers and publishers ", func() { + connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + SASLType: amqp.SASLTypeAnonymous()}) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + + queueName := generateNameWithDateTime("Entity tracker should be aligned with consumers and publishers") + + _, err = connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ + Name: queueName, + }) + + Expect(err).To(BeNil()) + publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: queueName}, "test") + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) + consumer, err := connection.NewConsumer(context.Background(), queueName, nil) + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + // check the entity tracker + Expect(connection.entitiesTracker).NotTo(BeNil()) + entLen := 0 + connection.entitiesTracker.consumers.Range(func(key, value interface{}) bool { + entLen++ + return true + }) + Expect(entLen).To(Equal(1)) + + entLen = 0 + connection.entitiesTracker.publishers.Range(func(key, value interface{}) bool { + entLen++ + return true + }) + Expect(entLen).To(Equal(1)) + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(publisher.Close(context.Background())).To(BeNil()) + + entLen = 0 + connection.entitiesTracker.consumers.Range(func(key, value interface{}) bool { + entLen++ + return true + }) + Expect(entLen).To(Equal(0)) + + entLen = 0 + connection.entitiesTracker.publishers.Range(func(key, value interface{}) bool { + entLen++ + return true + }) + Expect(entLen).To(Equal(0)) + + err = connection.Management().DeleteQueue(context.Background(), queueName) + Expect(err).To(BeNil()) + + Expect(connection.Close(context.Background())).To(BeNil()) + + }) + //It("AMQP TLS connection should success with SASLTypeAnonymous ", func() { // amqpConnection := NewAmqpConnection() // Expect(amqpConnection).NotTo(BeNil()) diff --git a/pkg/rabbitmqamqp/amqp_consumer.go b/pkg/rabbitmqamqp/amqp_consumer.go index 3d36e6a..ecb6863 100644 --- a/pkg/rabbitmqamqp/amqp_consumer.go +++ b/pkg/rabbitmqamqp/amqp_consumer.go @@ -25,17 +25,23 @@ func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error { return dc.receiver.RejectMessage(ctx, dc.message, e) } -func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error { +// copyAnnotations helper function to copy annotations +func copyAnnotations(annotations amqp.Annotations) (amqp.Annotations, error) { if err := validateMessageAnnotations(annotations); err != nil { + return nil, err + } + destination := make(amqp.Annotations) + for key, value := range annotations { + destination[key] = value + } + return destination, nil +} + +func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error { + destination, err := copyAnnotations(annotations) + if err != nil { return err } - // copy the rabbitmq annotations to amqp annotations - destination := make(amqp.Annotations) - for keyA, value := range annotations { - destination[keyA] = value - - } - return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{ DeliveryFailed: true, UndeliverableHere: true, @@ -48,15 +54,10 @@ func (dc *DeliveryContext) Requeue(ctx context.Context) error { } func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error { - if err := validateMessageAnnotations(annotations); err != nil { + destination, err := copyAnnotations(annotations) + if err != nil { return err } - // copy the rabbitmq annotations to amqp annotations - destination := make(amqp.Annotations) - for key, value := range annotations { - destination[key] = value - - } return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{ DeliveryFailed: false, UndeliverableHere: false, @@ -144,5 +145,6 @@ func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) { } func (c *Consumer) Close(ctx context.Context) error { + c.connection.entitiesTracker.removeConsumer(c) return c.receiver.Load().Close(ctx) } diff --git a/pkg/rabbitmqamqp/amqp_consumer_stream_test.go b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go index b366320..10066d6 100644 --- a/pkg/rabbitmqamqp/amqp_consumer_stream_test.go +++ b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go @@ -73,7 +73,7 @@ var _ = Describe("Consumer stream test", func() { dc, err := consumerOffsetValue.Receive(context.Background()) Expect(err).To(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i+5))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i+5))) Expect(dc.Accept(context.Background())).To(BeNil()) } @@ -88,7 +88,7 @@ var _ = Describe("Consumer stream test", func() { dc, err := consumerFirst.Receive(context.Background()) Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) Expect(dc.Accept(context.Background())).To(BeNil()) } @@ -107,7 +107,7 @@ var _ = Describe("Consumer stream test", func() { dc, err := consumerLast.Receive(context.Background()) Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).NotTo(Equal(fmt.Sprintf("Message #%d", 0))) + Expect(string(dc.Message().GetData())).NotTo(Equal(fmt.Sprintf("Message #%d", 0))) Expect(dc.Accept(context.Background())).To(BeNil()) consumerNext, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ @@ -125,7 +125,7 @@ var _ = Describe("Consumer stream test", func() { dc, err = consumerNext.Receive(context.Background()) Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal("the next message")) + Expect(string(dc.Message().GetData())).To(Equal("the next message")) Expect(dc.Accept(context.Background())).To(BeNil()) signal <- struct{}{} }() @@ -179,7 +179,7 @@ var _ = Describe("Consumer stream test", func() { dc, err := consumer.Receive(context.Background()) Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) Expect(dc.Accept(context.Background())).To(BeNil()) } @@ -200,7 +200,7 @@ var _ = Describe("Consumer stream test", func() { dc, err := consumer.Receive(context.Background()) Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) } Expect(consumer.Close(context.Background())).To(BeNil()) @@ -235,7 +235,7 @@ var _ = Describe("Consumer stream test", func() { dc, err := consumerBanana.Receive(context.Background()) Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i))) Expect(dc.Accept(context.Background())).To(BeNil()) } @@ -254,7 +254,7 @@ var _ = Describe("Consumer stream test", func() { dc, err := consumerApple.Receive(context.Background()) Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i))) Expect(dc.Accept(context.Background())).To(BeNil()) } @@ -273,9 +273,9 @@ var _ = Describe("Consumer stream test", func() { Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) if i < 10 { - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i))) } else { - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i-10))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i-10))) } Expect(dc.Accept(context.Background())).To(BeNil()) } @@ -296,10 +296,9 @@ var _ = Describe("Consumer stream test", func() { Expect(err).To(BeNil()) Expect(dc.Message()).NotTo(BeNil()) if i < 10 { - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i))) - + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i))) } else { - Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf(" #%d", i-10))) + Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf(" #%d", i-10))) } Expect(dc.Accept(context.Background())).To(BeNil()) } diff --git a/pkg/rabbitmqamqp/amqp_consumer_test.go b/pkg/rabbitmqamqp/amqp_consumer_test.go index af46ac3..40945a0 100644 --- a/pkg/rabbitmqamqp/amqp_consumer_test.go +++ b/pkg/rabbitmqamqp/amqp_consumer_test.go @@ -81,6 +81,7 @@ var _ = Describe("NewConsumer tests", func() { Expect(consumer.Close(context.Background())).To(BeNil()) Expect(err).To(BeNil()) nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) + Expect(err).To(BeNil()) Expect(nMessages).To(Equal(1)) Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil()) @@ -116,6 +117,7 @@ var _ = Describe("NewConsumer tests", func() { Expect(consumer.Close(context.Background())).To(BeNil()) Expect(err).To(BeNil()) nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) + Expect(err).To(BeNil()) Expect(nMessages).To(Equal(1)) Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil()) @@ -153,6 +155,7 @@ var _ = Describe("NewConsumer tests", func() { Info: nil, })).To(BeNil()) nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) + Expect(err).To(BeNil()) Expect(nMessages).To(Equal(0)) Expect(consumer.Close(context.Background())).To(BeNil()) Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) diff --git a/pkg/rabbitmqamqp/amqp_management.go b/pkg/rabbitmqamqp/amqp_management.go index e638141..e9d5114 100644 --- a/pkg/rabbitmqamqp/amqp_management.go +++ b/pkg/rabbitmqamqp/amqp_management.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" - "github.com/Azure/go-amqp" - "github.com/google/uuid" "strconv" "time" + + "github.com/Azure/go-amqp" + "github.com/google/uuid" ) var ErrPreconditionFailed = errors.New("precondition Failed") @@ -22,7 +23,6 @@ type AmqpManagement struct { sender *amqp.Sender receiver *amqp.Receiver lifeCycle *LifeCycle - cancel context.CancelFunc } func NewAmqpManagement() *AmqpManagement { diff --git a/pkg/rabbitmqamqp/amqp_publisher_test.go b/pkg/rabbitmqamqp/amqp_publisher_test.go index 91c54ba..54ed6d8 100644 --- a/pkg/rabbitmqamqp/amqp_publisher_test.go +++ b/pkg/rabbitmqamqp/amqp_publisher_test.go @@ -91,6 +91,7 @@ var _ = Describe("AMQP publisher ", func() { err = connection.management.DeleteQueue(context.Background(), qName) Expect(err).To(BeNil()) publishResult, err = publisher.Publish(context.Background(), NewMessage([]byte("hello"))) + Expect(publishResult).To(BeNil()) Expect(err).NotTo(BeNil()) Expect(connection.Close(context.Background())) }) @@ -146,6 +147,8 @@ var _ = Describe("AMQP publisher ", func() { Name: name, IsAutoDelete: false, }) + Expect(err).To(BeNil()) + msg = NewMessage([]byte("hello")) Expect(MessagePropertyToAddress(msg, &ExchangeAddress{Exchange: name})).To(BeNil()) // the status should be StateReleased since the exchange does not have any binding diff --git a/pkg/rabbitmqamqp/amqp_queue_test.go b/pkg/rabbitmqamqp/amqp_queue_test.go index ace4307..6a9762d 100644 --- a/pkg/rabbitmqamqp/amqp_queue_test.go +++ b/pkg/rabbitmqamqp/amqp_queue_test.go @@ -37,6 +37,7 @@ var _ = Describe("AMQP Queue test ", func() { // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(err).To(BeNil()) Expect(queueInfoReceived).To(Equal(queueInfo)) err = management.DeleteQueue(context.TODO(), queueName) @@ -84,6 +85,7 @@ var _ = Describe("AMQP Queue test ", func() { // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(err).To(BeNil()) Expect(queueInfoReceived).To(Equal(queueInfo)) err = management.DeleteQueue(context.TODO(), queueName) @@ -108,6 +110,7 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.Type()).To(Equal(Quorum)) // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(err).To(BeNil()) Expect(queueInfoReceived).To(Equal(queueInfo)) err = management.DeleteQueue(context.TODO(), queueName) @@ -134,6 +137,7 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.Type()).To(Equal(Classic)) // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(err).To(BeNil()) Expect(queueInfoReceived).To(Equal(queueInfo)) err = management.DeleteQueue(context.TODO(), queueName) diff --git a/pkg/rabbitmqamqp/amqp_utils.go b/pkg/rabbitmqamqp/amqp_utils.go index ea0f05f..4ded64d 100644 --- a/pkg/rabbitmqamqp/amqp_utils.go +++ b/pkg/rabbitmqamqp/amqp_utils.go @@ -2,9 +2,10 @@ package rabbitmqamqp import ( "fmt" - "github.com/Azure/go-amqp" "math/rand" "time" + + "github.com/Azure/go-amqp" ) const AtMostOnce = 0 @@ -74,7 +75,7 @@ func random(max int) int { } func validateMessageAnnotations(annotations amqp.Annotations) error { - for k, _ := range annotations { + for k := range annotations { switch tp := k.(type) { case string: if err := validateMessageAnnotationKey(tp); err != nil { diff --git a/pkg/rabbitmqamqp/converters_test.go b/pkg/rabbitmqamqp/converters_test.go index 07b69cb..1fdc090 100644 --- a/pkg/rabbitmqamqp/converters_test.go +++ b/pkg/rabbitmqamqp/converters_test.go @@ -34,15 +34,15 @@ var _ = Describe("Converters", func() { }) It("Converts from string logError", func() { - v, err := CapacityFrom("10LL") + _, err := CapacityFrom("10LL") Expect(fmt.Sprintf("%s", err)). To(ContainSubstring("Invalid unit size format")) - v, err = CapacityFrom("aGB") + _, err = CapacityFrom("aGB") Expect(fmt.Sprintf("%s", err)). To(ContainSubstring("Invalid number format")) - v, err = CapacityFrom("") + v, err := CapacityFrom("") Expect(v).To(Equal(int64(0))) Expect(err).To(BeNil())