Add AmqpQueue.Purge() (#14)

This commit is contained in:
Michal Kuratczyk 2024-10-01 23:39:33 +02:00 committed by GitHub
parent 95d4df61ad
commit e662d9534b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 53 additions and 1 deletions

View File

@ -2,6 +2,7 @@ package rabbitmq_amqp
import (
"context"
"github.com/Azure/go-amqp"
)
@ -164,6 +165,12 @@ func (a *AmqpQueue) Delete(ctx context.Context) error {
return err
}
func (a *AmqpQueue) Purge(ctx context.Context) (int, error) {
path := queuePurgePath(a.name)
response, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200})
return int(response["message_count"].(uint64)), err
}
func (a *AmqpQueue) Name(queueName string) IQueueSpecification {
a.name = queueName
return a

View File

@ -2,6 +2,9 @@ package rabbitmq_amqp
import (
"context"
"strconv"
"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
@ -146,4 +149,40 @@ var _ = Describe("AMQP Queue test ", func() {
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
It("AMQP Purge Queue should succeed and return the number of messages purged", func() {
const queueName = "AMQP Purge Queue should succeed and return the number of messages purged"
queueSpec := management.Queue(queueName)
_, err := queueSpec.Declare(context.TODO())
Expect(err).To(BeNil())
publishMessages(queueName, 10)
purged, err := queueSpec.Purge(context.TODO())
Expect(err).To(BeNil())
Expect(purged).To(Equal(10))
})
})
// TODO: This should be replaced with this library's publish function
// but for the time being, we need a way to publish messages or test purposes
func publishMessages(queueName string, count int) {
conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost", nil)
if err != nil {
Fail(err.Error())
}
session, err := conn.NewSession(context.TODO(), nil)
if err != nil {
Fail(err.Error())
}
sender, err := session.NewSender(context.TODO(), queuePath(queueName), nil)
if err != nil {
Fail(err.Error())
}
for i := 0; i < count; i++ {
err = sender.Send(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))), nil)
if err != nil {
Fail(err.Error())
}
}
}

View File

@ -4,9 +4,10 @@ import (
"crypto/md5"
"encoding/base64"
"fmt"
"github.com/google/uuid"
"net/url"
"strings"
"github.com/google/uuid"
)
const (
@ -69,6 +70,10 @@ func queuePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName)
}
func queuePurgePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName) + "/messages"
}
func exchangePath(exchangeName string) string {
return "/" + exchanges + "/" + encodePathSegments(exchangeName)
}

View File

@ -37,6 +37,7 @@ type IQueueSpecification interface {
MaxLengthBytes(length int64) IQueueSpecification
DeadLetterExchange(dlx string) IQueueSpecification
DeadLetterRoutingKey(dlrk string) IQueueSpecification
Purge(ctx context.Context) (int, error)
}
// IQueueInfo represents the information of a queue