diff --git a/README.md b/README.md index 8cdb872..a87e5ea 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ # RabbitMQ AMQP 1.0 Golang Client This library is meant to be used with RabbitMQ 4.0. -Suitable for testing in pre-production environments. - ## Getting Started - [Getting Started](docs/examples/getting_started) - [Examples](docs/examples) + Inside the `docs/examples` directory you will find several examples to get you started.
+ Also advanced examples like how to use streams, how to handle reconnections, and how to use TLS. - Getting started Video tutorial:
[![Getting Started](https://img.youtube.com/vi/iR1JUFh3udI/0.jpg)](https://youtu.be/iR1JUFh3udI) diff --git a/docs/examples/reliable/reliable.go b/docs/examples/reliable/reliable.go index 36f3928..4106b83 100644 --- a/docs/examples/reliable/reliable.go +++ b/docs/examples/reliable/reliable.go @@ -16,18 +16,19 @@ func main() { var stateAccepted int32 var stateReleased int32 var stateRejected int32 + var isRunning bool var received int32 var failed int32 startTime := time.Now() + isRunning = true go func() { - for { + for isRunning { time.Sleep(5 * time.Second) total := stateAccepted + stateReleased + stateRejected messagesPerSecond := float64(total) / time.Since(startTime).Seconds() rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond) - } }() @@ -41,6 +42,16 @@ func main() { switch statusChanged.To.(type) { case *rmq.StateOpen: signalBlock.Broadcast() + case *rmq.StateReconnecting: + rmq.Info("[connection]", "Reconnecting to the AMQP 1.0 server") + case *rmq.StateClosed: + StateClosed := statusChanged.To.(*rmq.StateClosed) + if errors.Is(StateClosed.GetError(), rmq.ErrMaxReconnectAttemptsReached) { + rmq.Error("[connection]", "Max reconnect attempts reached. Closing connection", StateClosed.GetError()) + signalBlock.Broadcast() + isRunning = false + } + } } }(stateChanged) @@ -87,13 +98,13 @@ func main() { // Consume messages from the queue go func(ctx context.Context) { - for { + for isRunning { deliveryContext, err := consumer.Receive(ctx) if errors.Is(err, context.Canceled) { // The consumer was closed correctly return } - if err != nil { + if err != nil && isRunning { // An error occurred receiving the message // here the consumer could be disconnected from the server due to a network error signalBlock.L.Lock() @@ -107,7 +118,7 @@ func main() { atomic.AddInt32(&received, 1) err = deliveryContext.Accept(context.Background()) - if err != nil { + if err != nil && isRunning { // same here the delivery could not be accepted due to a network error // we wait for 2_500 ms and try again time.Sleep(2500 * time.Millisecond) @@ -124,12 +135,13 @@ func main() { return } - wg := &sync.WaitGroup{} for i := 0; i < 1; i++ { - wg.Add(1) go func() { - defer wg.Done() for i := 0; i < 500_000; i++ { + if !isRunning { + rmq.Info("[Publisher]", "Publisher is stopped simulation not running, queue", queueName) + return + } publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) if err != nil { // here you need to deal with the error. You can store the message in a local in memory/persistent storage @@ -160,7 +172,6 @@ func main() { } }() } - wg.Wait() println("press any key to close the connection") diff --git a/pkg/rabbitmqamqp/amqp_binding.go b/pkg/rabbitmqamqp/amqp_binding.go index 5638c3a..cbae973 100644 --- a/pkg/rabbitmqamqp/amqp_binding.go +++ b/pkg/rabbitmqamqp/amqp_binding.go @@ -58,8 +58,8 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) { kv[destination] = b.destinationName kv["arguments"] = make(map[string]any) _, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204}) - bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey) - return bindingPathWithExchangeQueueKey, err + bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey) + return bindingPathWithExchangeQueueAndKey, err } // Unbind removes a binding between an exchange and a queue or exchange diff --git a/pkg/rabbitmqamqp/amqp_connection.go b/pkg/rabbitmqamqp/amqp_connection.go index d51b628..8d65fcf 100644 --- a/pkg/rabbitmqamqp/amqp_connection.go +++ b/pkg/rabbitmqamqp/amqp_connection.go @@ -350,6 +350,10 @@ func (a *AmqpConnection) maybeReconnect() { Error("Reconnection attempt failed", "attempt", attempt, "error", err, "ID", a.Id()) } + // If we reach here, all attempts failed + Error("All reconnection attempts failed, closing connection", "ID", a.Id()) + a.lifeCycle.SetState(&StateClosed{error: ErrMaxReconnectAttemptsReached}) + } // restartEntities attempts to restart all publishers and consumers after a reconnection diff --git a/pkg/rabbitmqamqp/amqp_connection_recovery.go b/pkg/rabbitmqamqp/amqp_connection_recovery.go index 5a3871d..2dac826 100644 --- a/pkg/rabbitmqamqp/amqp_connection_recovery.go +++ b/pkg/rabbitmqamqp/amqp_connection_recovery.go @@ -1,10 +1,14 @@ package rabbitmqamqp import ( + "errors" "sync" "time" ) +// ErrMaxReconnectAttemptsReached typed error when the MaxReconnectAttempts is reached +var ErrMaxReconnectAttemptsReached = errors.New("max reconnect attempts reached, connection will not be recovered") + type RecoveryConfiguration struct { /* ActiveRecovery Define if the recovery is activated. diff --git a/pkg/rabbitmqamqp/life_cycle.go b/pkg/rabbitmqamqp/life_cycle.go index e6feb9e..eb108c5 100644 --- a/pkg/rabbitmqamqp/life_cycle.go +++ b/pkg/rabbitmqamqp/life_cycle.go @@ -77,6 +77,9 @@ func (s StateChanged) String() string { switch s.To.(type) { case *StateClosed: + if s.To.(*StateClosed).error == nil { + return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To)) + } return fmt.Sprintf("From: %s, To: %s, Error: %s", statusToString(s.From), statusToString(s.To), s.To.(*StateClosed).error) }