Add ErrMaxReconnectAttemptsReached (#49)

* add error max ErrMaxReconnectAttemptsReached
* closes https://github.com/rabbitmq/rabbitmq-amqp-go-client/issues/48
---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
Gabriele Santomaggio 2025-06-18 14:46:55 +02:00 committed by GitHub
parent 25962eccd1
commit ccbc8d1c16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 35 additions and 13 deletions

View File

@ -1,13 +1,13 @@
# RabbitMQ AMQP 1.0 Golang Client # RabbitMQ AMQP 1.0 Golang Client
This library is meant to be used with RabbitMQ 4.0. This library is meant to be used with RabbitMQ 4.0.
Suitable for testing in pre-production environments.
## Getting Started ## Getting Started
- [Getting Started](docs/examples/getting_started) - [Getting Started](docs/examples/getting_started)
- [Examples](docs/examples) - [Examples](docs/examples)
Inside the `docs/examples` directory you will find several examples to get you started.</br>
Also advanced examples like how to use streams, how to handle reconnections, and how to use TLS.
- Getting started Video tutorial: </br> - Getting started Video tutorial: </br>
[![Getting Started](https://img.youtube.com/vi/iR1JUFh3udI/0.jpg)](https://youtu.be/iR1JUFh3udI) [![Getting Started](https://img.youtube.com/vi/iR1JUFh3udI/0.jpg)](https://youtu.be/iR1JUFh3udI)

View File

@ -16,18 +16,19 @@ func main() {
var stateAccepted int32 var stateAccepted int32
var stateReleased int32 var stateReleased int32
var stateRejected int32 var stateRejected int32
var isRunning bool
var received int32 var received int32
var failed int32 var failed int32
startTime := time.Now() startTime := time.Now()
isRunning = true
go func() { go func() {
for { for isRunning {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
total := stateAccepted + stateReleased + stateRejected total := stateAccepted + stateReleased + stateRejected
messagesPerSecond := float64(total) / time.Since(startTime).Seconds() messagesPerSecond := float64(total) / time.Since(startTime).Seconds()
rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond) rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond)
} }
}() }()
@ -41,6 +42,16 @@ func main() {
switch statusChanged.To.(type) { switch statusChanged.To.(type) {
case *rmq.StateOpen: case *rmq.StateOpen:
signalBlock.Broadcast() signalBlock.Broadcast()
case *rmq.StateReconnecting:
rmq.Info("[connection]", "Reconnecting to the AMQP 1.0 server")
case *rmq.StateClosed:
StateClosed := statusChanged.To.(*rmq.StateClosed)
if errors.Is(StateClosed.GetError(), rmq.ErrMaxReconnectAttemptsReached) {
rmq.Error("[connection]", "Max reconnect attempts reached. Closing connection", StateClosed.GetError())
signalBlock.Broadcast()
isRunning = false
}
} }
} }
}(stateChanged) }(stateChanged)
@ -87,13 +98,13 @@ func main() {
// Consume messages from the queue // Consume messages from the queue
go func(ctx context.Context) { go func(ctx context.Context) {
for { for isRunning {
deliveryContext, err := consumer.Receive(ctx) deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
// The consumer was closed correctly // The consumer was closed correctly
return return
} }
if err != nil { if err != nil && isRunning {
// An error occurred receiving the message // An error occurred receiving the message
// here the consumer could be disconnected from the server due to a network error // here the consumer could be disconnected from the server due to a network error
signalBlock.L.Lock() signalBlock.L.Lock()
@ -107,7 +118,7 @@ func main() {
atomic.AddInt32(&received, 1) atomic.AddInt32(&received, 1)
err = deliveryContext.Accept(context.Background()) err = deliveryContext.Accept(context.Background())
if err != nil { if err != nil && isRunning {
// same here the delivery could not be accepted due to a network error // same here the delivery could not be accepted due to a network error
// we wait for 2_500 ms and try again // we wait for 2_500 ms and try again
time.Sleep(2500 * time.Millisecond) time.Sleep(2500 * time.Millisecond)
@ -124,12 +135,13 @@ func main() {
return return
} }
wg := &sync.WaitGroup{}
for i := 0; i < 1; i++ { for i := 0; i < 1; i++ {
wg.Add(1)
go func() { go func() {
defer wg.Done()
for i := 0; i < 500_000; i++ { for i := 0; i < 500_000; i++ {
if !isRunning {
rmq.Info("[Publisher]", "Publisher is stopped simulation not running, queue", queueName)
return
}
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil { if err != nil {
// here you need to deal with the error. You can store the message in a local in memory/persistent storage // here you need to deal with the error. You can store the message in a local in memory/persistent storage
@ -160,7 +172,6 @@ func main() {
} }
}() }()
} }
wg.Wait()
println("press any key to close the connection") println("press any key to close the connection")

View File

@ -58,8 +58,8 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
kv[destination] = b.destinationName kv[destination] = b.destinationName
kv["arguments"] = make(map[string]any) kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204}) _, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey) bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
return bindingPathWithExchangeQueueKey, err return bindingPathWithExchangeQueueAndKey, err
} }
// Unbind removes a binding between an exchange and a queue or exchange // Unbind removes a binding between an exchange and a queue or exchange

View File

@ -350,6 +350,10 @@ func (a *AmqpConnection) maybeReconnect() {
Error("Reconnection attempt failed", "attempt", attempt, "error", err, "ID", a.Id()) Error("Reconnection attempt failed", "attempt", attempt, "error", err, "ID", a.Id())
} }
// If we reach here, all attempts failed
Error("All reconnection attempts failed, closing connection", "ID", a.Id())
a.lifeCycle.SetState(&StateClosed{error: ErrMaxReconnectAttemptsReached})
} }
// restartEntities attempts to restart all publishers and consumers after a reconnection // restartEntities attempts to restart all publishers and consumers after a reconnection

View File

@ -1,10 +1,14 @@
package rabbitmqamqp package rabbitmqamqp
import ( import (
"errors"
"sync" "sync"
"time" "time"
) )
// ErrMaxReconnectAttemptsReached typed error when the MaxReconnectAttempts is reached
var ErrMaxReconnectAttemptsReached = errors.New("max reconnect attempts reached, connection will not be recovered")
type RecoveryConfiguration struct { type RecoveryConfiguration struct {
/* /*
ActiveRecovery Define if the recovery is activated. ActiveRecovery Define if the recovery is activated.

View File

@ -77,6 +77,9 @@ func (s StateChanged) String() string {
switch s.To.(type) { switch s.To.(type) {
case *StateClosed: case *StateClosed:
if s.To.(*StateClosed).error == nil {
return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To))
}
return fmt.Sprintf("From: %s, To: %s, Error: %s", statusToString(s.From), statusToString(s.To), s.To.(*StateClosed).error) return fmt.Sprintf("From: %s, To: %s, Error: %s", statusToString(s.From), statusToString(s.To), s.To.(*StateClosed).error)
} }