220 lines
6.5 KiB
Go
220 lines
6.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/Azure/go-amqp"
|
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
func main() {
|
|
queueName := "reliable-amqp10-go-queue"
|
|
var stateAccepted int32
|
|
var stateReleased int32
|
|
var stateRejected int32
|
|
var isRunning bool
|
|
|
|
var received int32
|
|
var failed int32
|
|
|
|
startTime := time.Now()
|
|
isRunning = true
|
|
go func() {
|
|
for isRunning {
|
|
time.Sleep(5 * time.Second)
|
|
total := stateAccepted + stateReleased + stateRejected
|
|
messagesPerSecond := float64(total) / time.Since(startTime).Seconds()
|
|
rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond)
|
|
}
|
|
}()
|
|
|
|
rmq.Info("How to deal with network disconnections")
|
|
signalBlock := sync.Cond{L: &sync.Mutex{}}
|
|
/// Create a channel to receive state change notifications
|
|
stateChanged := make(chan *rmq.StateChanged, 1)
|
|
go func(ch chan *rmq.StateChanged) {
|
|
for statusChanged := range ch {
|
|
rmq.Info("[connection]", "Status changed", statusChanged)
|
|
switch statusChanged.To.(type) {
|
|
case *rmq.StateOpen:
|
|
signalBlock.Broadcast()
|
|
case *rmq.StateReconnecting:
|
|
rmq.Info("[connection]", "Reconnecting to the AMQP 1.0 server")
|
|
case *rmq.StateClosed:
|
|
StateClosed := statusChanged.To.(*rmq.StateClosed)
|
|
if errors.Is(StateClosed.GetError(), rmq.ErrMaxReconnectAttemptsReached) {
|
|
rmq.Error("[connection]", "Max reconnect attempts reached. Closing connection", StateClosed.GetError())
|
|
signalBlock.Broadcast()
|
|
isRunning = false
|
|
}
|
|
|
|
}
|
|
}
|
|
}(stateChanged)
|
|
|
|
// Open a connection to the AMQP 1.0 server
|
|
amqpConnection, err := rmq.Dial(context.Background(), "amqp://", &rmq.AmqpConnOptions{
|
|
SASLType: amqp.SASLTypeAnonymous(),
|
|
ContainerID: "reliable-amqp10-go",
|
|
RecoveryConfiguration: &rmq.RecoveryConfiguration{
|
|
ActiveRecovery: true,
|
|
BackOffReconnectInterval: 2 * time.Second, // we reduce the reconnect interval to speed up the test. The default is 5 seconds
|
|
// In production, you should avoid BackOffReconnectInterval with low values since it can cause a high number of reconnection attempts
|
|
MaxReconnectAttempts: 5,
|
|
},
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error opening connection", err)
|
|
return
|
|
}
|
|
// Register the channel to receive status change notifications
|
|
amqpConnection.NotifyStatusChange(stateChanged)
|
|
|
|
fmt.Printf("AMQP connection opened.\n")
|
|
// Create the management interface for the connection
|
|
// so we can declare exchanges, queues, and bindings
|
|
management := amqpConnection.Management()
|
|
|
|
// Declare a Quorum queue
|
|
queueInfo, err := management.DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{
|
|
Name: queueName,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error declaring queue", err)
|
|
return
|
|
}
|
|
|
|
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
|
|
if err != nil {
|
|
rmq.Error("Error creating consumer", err)
|
|
return
|
|
}
|
|
|
|
consumerContext, cancel := context.WithCancel(context.Background())
|
|
|
|
// Consume messages from the queue
|
|
go func(ctx context.Context) {
|
|
for isRunning {
|
|
deliveryContext, err := consumer.Receive(ctx)
|
|
if errors.Is(err, context.Canceled) {
|
|
// The consumer was closed correctly
|
|
return
|
|
}
|
|
if err != nil && isRunning {
|
|
// An error occurred receiving the message
|
|
// here the consumer could be disconnected from the server due to a network error
|
|
signalBlock.L.Lock()
|
|
rmq.Info("[Consumer]", "Consumer is blocked, queue", queueName, "error", err)
|
|
signalBlock.Wait()
|
|
rmq.Info("[Consumer]", "Consumer is unblocked, queue", queueName)
|
|
|
|
signalBlock.L.Unlock()
|
|
continue
|
|
}
|
|
|
|
atomic.AddInt32(&received, 1)
|
|
err = deliveryContext.Accept(context.Background())
|
|
if err != nil && isRunning {
|
|
// same here the delivery could not be accepted due to a network error
|
|
// we wait for 2_500 ms and try again
|
|
time.Sleep(2500 * time.Millisecond)
|
|
continue
|
|
}
|
|
}
|
|
}(consumerContext)
|
|
|
|
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{
|
|
Queue: queueName,
|
|
}, nil)
|
|
if err != nil {
|
|
rmq.Error("Error creating publisher", err)
|
|
return
|
|
}
|
|
|
|
for i := 0; i < 1; i++ {
|
|
go func() {
|
|
for i := 0; i < 500_000; i++ {
|
|
if !isRunning {
|
|
rmq.Info("[Publisher]", "Publisher is stopped simulation not running, queue", queueName)
|
|
return
|
|
}
|
|
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
|
|
if err != nil {
|
|
// here you need to deal with the error. You can store the message in a local in memory/persistent storage
|
|
// then retry to send the message as soon as the connection is reestablished
|
|
|
|
atomic.AddInt32(&failed, 1)
|
|
// block signalBlock until the connection is reestablished
|
|
signalBlock.L.Lock()
|
|
rmq.Info("[Publisher]", "Publisher is blocked, queue", queueName, "error", err)
|
|
signalBlock.Wait()
|
|
rmq.Info("[Publisher]", "Publisher is unblocked, queue", queueName)
|
|
signalBlock.L.Unlock()
|
|
|
|
} else {
|
|
switch publishResult.Outcome.(type) {
|
|
case *rmq.StateAccepted:
|
|
atomic.AddInt32(&stateAccepted, 1)
|
|
case *rmq.StateReleased:
|
|
atomic.AddInt32(&stateReleased, 1)
|
|
case *rmq.StateRejected:
|
|
atomic.AddInt32(&stateRejected, 1)
|
|
default:
|
|
// these status are not supported. Leave it for AMQP 1.0 compatibility
|
|
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
|
|
rmq.Warn("Message state: %v", publishResult.Outcome)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
println("press any key to close the connection")
|
|
|
|
var input string
|
|
_, _ = fmt.Scanln(&input)
|
|
|
|
cancel()
|
|
//Close the consumer
|
|
err = consumer.Close(context.Background())
|
|
if err != nil {
|
|
rmq.Error("[NewConsumer]", err)
|
|
return
|
|
}
|
|
// Close the publisher
|
|
err = publisher.Close(context.Background())
|
|
if err != nil {
|
|
rmq.Error("[NewPublisher]", err)
|
|
return
|
|
}
|
|
|
|
// Purge the queue
|
|
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
|
|
if err != nil {
|
|
fmt.Printf("Error purging queue: %v\n", err)
|
|
return
|
|
}
|
|
fmt.Printf("Purged %d messages from the queue.\n", purged)
|
|
|
|
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
|
|
if err != nil {
|
|
fmt.Printf("Error deleting queue: %v\n", err)
|
|
return
|
|
}
|
|
|
|
err = amqpConnection.Close(context.Background())
|
|
if err != nil {
|
|
fmt.Printf("Error closing connection: %v\n", err)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("AMQP connection closed.\n")
|
|
// not necessary. It waits for the status change to be printed
|
|
time.Sleep(100 * time.Millisecond)
|
|
close(stateChanged)
|
|
}
|