diff --git a/pkg/rabbitmqamqp/amqp_publisher.go b/pkg/rabbitmqamqp/amqp_publisher.go index 5fd8f7d..eee125a 100644 --- a/pkg/rabbitmqamqp/amqp_publisher.go +++ b/pkg/rabbitmqamqp/amqp_publisher.go @@ -62,7 +62,7 @@ RabbitMQ supports the following DeliveryState types: - StateRejected See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information. -Note: If the destination address is not defined during the creation, the message must have a TO property set. +If the destination address is not defined during the creation, the message must have a TO property set. You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: @@ -84,6 +84,16 @@ Create a new publisher that sends messages based on message destination address: ..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"}) ..:= publisher.Publish(context.Background(), msg) + + +The message is persistent by default by setting the Header.Durable to true when Header is nil. +You can set the message to be non-persistent by setting the Header.Durable to false. +Note: +When you use the `Header` is up to you to set the message properties, +You need set the `Header.Durable` to true or false. + + + */ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) { @@ -97,6 +107,14 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis return nil, err } } + + // set the default persistence to the message + if message.Header == nil { + message.Header = &amqp.MessageHeader{ + Durable: true, + } + } + r, err := m.sender.Load().SendWithReceipt(ctx, message, nil) if err != nil { return nil, err diff --git a/pkg/rabbitmqamqp/amqp_publisher_test.go b/pkg/rabbitmqamqp/amqp_publisher_test.go index 5617bf6..0959e4d 100644 --- a/pkg/rabbitmqamqp/amqp_publisher_test.go +++ b/pkg/rabbitmqamqp/amqp_publisher_test.go @@ -203,4 +203,65 @@ var _ = Describe("AMQP publisher ", func() { Expect(connection.Close(context.Background())).To(BeNil()) }) + + It("Message should durable by default", func() { + // https://github.com/rabbitmq/rabbitmq-server/pull/13918 + + // Here we test the default behavior of the message durability + // The lib should set the Header.Durable to true by default + // when the Header is set by the user + // it is up to the user to set the Header.Durable to true or false + connection, err := Dial(context.Background(), "amqp://", nil) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + name := generateNameWithDateTime("Message should durable by default") + _, err = connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ + Name: name, + }) + Expect(err).To(BeNil()) + + publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: name}, nil) + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) + + msg := NewMessage([]byte("hello")) + Expect(msg.Header).To(BeNil()) + publishResult, err := publisher.Publish(context.Background(), msg) + Expect(err).To(BeNil()) + Expect(publishResult).NotTo(BeNil()) + Expect(publishResult.Outcome).To(Equal(&StateAccepted{})) + Expect(msg.Header).NotTo(BeNil()) + Expect(msg.Header.Durable).To(BeTrue()) + + consumer, err := connection.NewConsumer(context.Background(), name, nil) + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc).NotTo(BeNil()) + Expect(dc.Message().Header).NotTo(BeNil()) + Expect(dc.Message().Header.Durable).To(BeTrue()) + Expect(dc.Accept(context.Background())).To(BeNil()) + + msgNotPersistent := NewMessage([]byte("hello")) + msgNotPersistent.Header = &amqp.MessageHeader{ + Durable: false, + } + publishResult, err = publisher.Publish(context.Background(), msgNotPersistent) + Expect(err).To(BeNil()) + Expect(publishResult).NotTo(BeNil()) + Expect(publishResult.Outcome).To(Equal(&StateAccepted{})) + Expect(msgNotPersistent.Header).NotTo(BeNil()) + Expect(msgNotPersistent.Header.Durable).To(BeFalse()) + dc, err = consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc).NotTo(BeNil()) + Expect(dc.Message().Header).NotTo(BeNil()) + Expect(dc.Message().Header.Durable).To(BeFalse()) + Expect(dc.Accept(context.Background())).To(BeNil()) + Expect(publisher.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), name)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + + }) }) diff --git a/pkg/rabbitmqamqp/amqp_queue.go b/pkg/rabbitmqamqp/amqp_queue.go index 485865b..97af575 100644 --- a/pkg/rabbitmqamqp/amqp_queue.go +++ b/pkg/rabbitmqamqp/amqp_queue.go @@ -29,13 +29,18 @@ func (a *AmqpQueueInfo) Members() []string { } func newAmqpQueueInfo(response map[string]any) *AmqpQueueInfo { + leader := "" + if response["leader"] != nil { + leader = response["leader"].(string) + } + return &AmqpQueueInfo{ name: response["name"].(string), isDurable: response["durable"].(bool), isAutoDelete: response["auto_delete"].(bool), isExclusive: response["exclusive"].(bool), queueType: TQueueType(response["type"].(string)), - leader: response["leader"].(string), + leader: leader, members: response["replicas"].([]string), arguments: response["arguments"].(map[string]any), consumerCount: response["consumer_count"].(uint32),