First consumer version (#19)

* First consumer version
---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2025-01-22 09:56:23 +01:00 committed by GitHub
parent 023979a0ad
commit cb006411f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 385 additions and 31 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/Azure/go-amqp" "github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
@ -67,34 +68,77 @@ func main() {
return return
} }
addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey) // Create a consumer to receive messages from the queue
// you need to build the address of the queue, but you can use the helper function
addrQueue, _ := rabbitmq_amqp.QueueAddress(&queueName)
consumer, err := amqpConnection.Consumer(context.Background(), addrQueue, "getting-started-consumer")
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return
}
consumerContext, cancel := context.WithCancel(context.Background())
// Consume messages from the queue
go func(ctx context.Context) {
for {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
rabbitmq_amqp.Info("[Consumer]", "consumer closed. Context", err)
return
}
if err != nil {
// An error occurred receiving the message
rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err)
return
}
rabbitmq_amqp.Info("[Consumer]", "Received message",
fmt.Sprintf("%s", deliveryContext.Message().Data))
err = deliveryContext.Accept(context.Background())
if err != nil {
rabbitmq_amqp.Error("Error accepting message", err)
return
}
}
}(consumerContext)
addr, _ := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher") publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
if err != nil { if err != nil {
rabbitmq_amqp.Error("Error creating publisher", err) rabbitmq_amqp.Error("Error creating publisher", err)
return return
} }
// Publish a message to the exchange for i := 0; i < 10; i++ {
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
if err != nil { // Publish a message to the exchange
rabbitmq_amqp.Error("Error publishing message", err) publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
return if err != nil {
} rabbitmq_amqp.Error("Error publishing message", err)
switch publishResult.Outcome { return
case &amqp.StateAccepted{}: }
rabbitmq_amqp.Info("Message accepted") switch publishResult.Outcome.(type) {
case &amqp.StateReleased{}: case *amqp.StateAccepted:
rabbitmq_amqp.Warn("Message was not routed") rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
case &amqp.StateRejected{}: break
rabbitmq_amqp.Warn("Message rejected") case *amqp.StateReleased:
stateType := publishResult.Outcome.(*amqp.StateRejected) rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
if stateType.Error != nil { break
rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error) case *amqp.StateRejected:
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*amqp.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
} }
default:
// these status are not supported
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
} }
println("press any key to close the connection") println("press any key to close the connection")
@ -102,13 +146,19 @@ func main() {
var input string var input string
_, _ = fmt.Scanln(&input) _, _ = fmt.Scanln(&input)
cancel()
//Close the consumer
err = consumer.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[Consumer]", err)
}
// Close the publisher // Close the publisher
err = publisher.Close(context.Background()) err = publisher.Close(context.Background())
if err != nil { if err != nil {
return return
} }
// Unbind the queue from the exchange
// Unbind the queue from the exchange
err = management.Unbind(context.TODO(), bindingPath) err = management.Unbind(context.TODO(), bindingPath)
if err != nil { if err != nil {
@ -143,8 +193,7 @@ func main() {
} }
fmt.Printf("AMQP Connection closed.\n") fmt.Printf("AMQP Connection closed.\n")
// Wait for the status change to be printed // not necessary. It waits for the status change to be printed
time.Sleep(500 * time.Millisecond) time.Sleep(100 * time.Millisecond)
close(stateChanged)
close(stateChangeds)
} }

View File

@ -26,7 +26,6 @@ func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, l
if !validateAddress(destinationAdd) { if !validateAddress(destinationAdd) {
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues) return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
} }
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce)) sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil { if err != nil {
return nil, err return nil, err
@ -34,6 +33,17 @@ func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, l
return newPublisher(sender), nil return newPublisher(sender), nil
} }
func (a *AmqpConnection) Consumer(ctx context.Context, destinationAdd string, linkName string) (*Consumer, error) {
if !validateAddress(destinationAdd) {
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
}
receiver, err := a.session.NewReceiver(ctx, destinationAdd, createReceiverLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil {
return nil, err
}
return newConsumer(receiver), nil
}
// Dial connect to the AMQP 1.0 server using the provided connectionSettings // Dial connect to the AMQP 1.0 server using the provided connectionSettings
// Returns a pointer to the new AmqpConnection if successful else an error. // Returns a pointer to the new AmqpConnection if successful else an error.
// addresses is a list of addresses to connect to. It picks one randomly. // addresses is a list of addresses to connect to. It picks one randomly.
@ -93,6 +103,7 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq
} }
a.Connection = conn a.Connection = conn
a.session, err = a.Connection.NewSession(ctx, nil) a.session, err = a.Connection.NewSession(ctx, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,82 @@
package rabbitmq_amqp
import (
"context"
"github.com/Azure/go-amqp"
)
type DeliveryContext struct {
receiver *amqp.Receiver
message *amqp.Message
}
func (dc *DeliveryContext) Message() *amqp.Message {
return dc.message
}
func (dc *DeliveryContext) Accept(ctx context.Context) error {
return dc.receiver.AcceptMessage(ctx, dc.message)
}
func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error {
return dc.receiver.RejectMessage(ctx, dc.message, e)
}
func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
if err := validateMessageAnnotations(annotations); err != nil {
return err
}
// copy the rabbitmq annotations to amqp annotations
destination := make(amqp.Annotations)
for key, value := range annotations {
destination[key] = value
}
return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{
DeliveryFailed: true,
UndeliverableHere: true,
Annotations: destination,
})
}
func (dc *DeliveryContext) Requeue(ctx context.Context) error {
return dc.receiver.ReleaseMessage(ctx, dc.message)
}
func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
if err := validateMessageAnnotations(annotations); err != nil {
return err
}
// copy the rabbitmq annotations to amqp annotations
destination := make(amqp.Annotations)
for key, value := range annotations {
destination[key] = value
}
return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{
DeliveryFailed: false,
UndeliverableHere: false,
Annotations: destination,
})
}
type Consumer struct {
receiver *amqp.Receiver
}
func newConsumer(receiver *amqp.Receiver) *Consumer {
return &Consumer{receiver: receiver}
}
func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
msg, err := c.receiver.Receive(ctx, nil)
if err != nil {
return nil, err
}
return &DeliveryContext{receiver: c.receiver, message: msg}, nil
}
func (c *Consumer) Close(ctx context.Context) error {
return c.receiver.Close(ctx)
}

View File

@ -0,0 +1,181 @@
package rabbitmq_amqp
import (
"context"
"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"time"
)
var _ = Describe("Consumer tests", func() {
It("AMQP Consumer should fail due to context cancellation", func() {
qName := generateNameWithDateTime("AMQP Consumer should fail due to context cancellation")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
addr, _ := QueueAddress(&qName)
queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
Name: qName,
IsAutoDelete: false,
IsExclusive: false,
QueueType: QueueType{Quorum},
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond)
cancel()
_, err = connection.Consumer(ctx, addr, "test")
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("context canceled"))
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Consumer should ack and empty the queue", func() {
qName := generateNameWithDateTime("AMQP Consumer should ack and empty the queue")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
Name: qName,
IsAutoDelete: false,
IsExclusive: false,
QueueType: QueueType{Quorum},
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 10)
addr, _ := QueueAddress(&qName)
consumer, err := connection.Consumer(context.Background(), addr, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
for i := 0; i < 10; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(dc.Accept(context.Background())).To(BeNil())
}
nMessages, err := connection.Management().PurgeQueue(context.Background(), qName)
Expect(err).To(BeNil())
Expect(nMessages).To(Equal(0))
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Consumer should requeue the message to the queue", func() {
qName := generateNameWithDateTime("AMQP Consumer should requeue the message to the queue")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
Name: qName,
IsAutoDelete: false,
IsExclusive: false,
QueueType: QueueType{Quorum},
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 1)
addr, _ := QueueAddress(&qName)
consumer, err := connection.Consumer(context.Background(), addr, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(dc.Requeue(context.Background())).To(BeNil())
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(err).To(BeNil())
nMessages, err := connection.Management().PurgeQueue(context.Background(), qName)
Expect(nMessages).To(Equal(1))
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Consumer should requeue the message to the queue with annotations", func() {
qName := generateNameWithDateTime("AMQP Consumer should requeue the message to the queue with annotations")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
Name: qName,
IsAutoDelete: false,
IsExclusive: false,
QueueType: QueueType{Quorum},
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 1)
addr, _ := QueueAddress(&qName)
consumer, err := connection.Consumer(context.Background(), addr, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
myAnnotations := amqp.Annotations{
"x-key1": "value1",
"x-key2": "value2",
}
Expect(dc.RequeueWithAnnotations(context.Background(), myAnnotations)).To(BeNil())
dcWithAnnotation, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dcWithAnnotation.Message().Annotations["x-key1"]).To(Equal("value1"))
Expect(dcWithAnnotation.Message().Annotations["x-key2"]).To(Equal("value2"))
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(err).To(BeNil())
nMessages, err := connection.Management().PurgeQueue(context.Background(), qName)
Expect(nMessages).To(Equal(1))
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
It("AMQP Consumer should discard the message to the queue with and without annotations", func() {
// TODO: Implement this test with a dead letter queue to test the discard feature
qName := generateNameWithDateTime("AMQP Consumer should discard the message to the queue with and without annotations")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
Name: qName,
IsAutoDelete: false,
IsExclusive: false,
QueueType: QueueType{Quorum},
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 2)
addr, _ := QueueAddress(&qName)
consumer, err := connection.Consumer(context.Background(), addr, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
myAnnotations := amqp.Annotations{
"x-key1": "value1",
"x-key2": "value2",
}
Expect(dc.DiscardWithAnnotations(context.Background(), myAnnotations)).To(BeNil())
dc, err = consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(dc.Discard(context.Background(), &amqp.Error{
Condition: "my error",
Description: "my error description",
Info: nil,
})).To(BeNil())
nMessages, err := connection.Management().PurgeQueue(context.Background(), qName)
Expect(nMessages).To(Equal(0))
Expect(consumer.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
})

View File

@ -29,7 +29,7 @@ func NewAmqpManagement() *AmqpManagement {
func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error {
if a.receiver == nil { if a.receiver == nil {
opts := createReceiverLinkOptions(managementNodeAddress, linkPairName) opts := createReceiverLinkOptions(managementNodeAddress, linkPairName, AtMostOnce)
receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts) receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts)
if err != nil { if err != nil {
return err return err

View File

@ -27,7 +27,6 @@ func newPublisher(sender *amqp.Sender) *Publisher {
// - StateRejected // - StateRejected
// See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information. // See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) { func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) {
r, err := m.sender.SendWithReceipt(ctx, message, nil) r, err := m.sender.SendWithReceipt(ctx, message, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -36,7 +35,6 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis
if err != nil { if err != nil {
return nil, err return nil, err
} }
publishResult := &PublishResult{ publishResult := &PublishResult{
Message: message, Message: message,
Outcome: state, Outcome: state,

View File

@ -1,6 +1,7 @@
package rabbitmq_amqp package rabbitmq_amqp
import ( import (
"fmt"
"github.com/Azure/go-amqp" "github.com/Azure/go-amqp"
"math/rand" "math/rand"
"time" "time"
@ -39,16 +40,27 @@ func createSenderLinkOptions(address string, linkName string, deliveryMode int)
// receiverLinkOptions returns the options for a receiver link // receiverLinkOptions returns the options for a receiver link
// with the given address and link name. // with the given address and link name.
// That should be the same for all the links. // That should be the same for all the links.
func createReceiverLinkOptions(address string, linkName string) *amqp.ReceiverOptions { func createReceiverLinkOptions(address string, linkName string, deliveryMode int) *amqp.ReceiverOptions {
prop := make(map[string]any) prop := make(map[string]any)
prop["paired"] = true prop["paired"] = true
receiverSettleMode := amqp.SenderSettleModeSettled.Ptr()
/// SndSettleMode = deliveryMode == DeliveryMode.AtMostOnce
// ? SenderSettleMode.Settled
// : SenderSettleMode.Unsettled,
if deliveryMode == AtLeastOnce {
receiverSettleMode = amqp.SenderSettleModeUnsettled.Ptr()
}
return &amqp.ReceiverOptions{ return &amqp.ReceiverOptions{
TargetAddress: address, TargetAddress: address,
DynamicAddress: false, DynamicAddress: false,
Name: linkName, Name: linkName,
Properties: prop, Properties: prop,
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(), Durability: 0,
ExpiryTimeout: 0,
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
RequestedSenderSettleMode: receiverSettleMode,
ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, ExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
Credit: 100, Credit: 100,
} }
@ -58,3 +70,24 @@ func random(max int) int {
r := rand.New(rand.NewSource(time.Now().Unix())) r := rand.New(rand.NewSource(time.Now().Unix()))
return r.Intn(max) return r.Intn(max)
} }
func validateMessageAnnotations(annotations amqp.Annotations) error {
for k, _ := range annotations {
switch tp := k.(type) {
case string:
if err := validateMessageAnnotationKey(tp); err != nil {
return err
}
default:
return fmt.Errorf("message annotation key must be a string: %v", k)
}
}
return nil
}
func validateMessageAnnotationKey(key string) error {
if key[:2] != "x-" {
return fmt.Errorf("message annotation key must start with 'x-': %s", key)
}
return nil
}