golang-demo/rabbitmq_example/receiver/receiver.go

172 lines
4.2 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
func main() {
exchangeName := "getting-started-go-exchange"
queueName := "getting-started-go-queue"
routingKey := "routing-key"
// Delcare Dead Letter Exchange
deadExchangeName := "getting-started-go-dead-letter-exchange"
deadQueueName := "getting-started-go-dead-letter-queue"
deadRoutingKey := "dead-letter-routing-key"
rmq.Info("Starting AMQP Go AMQP 1.0 Consumer")
// Create a channel to receive connection state change notifications
stateChanged := make(chan *rmq.StateChanged, 1)
go func(ch chan *rmq.StateChanged) {
for statusChanged := range ch {
rmq.Info("[consumer connection]", "Status changed", statusChanged)
}
}(stateChanged)
// Setup environment
env := rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil)
// Open connection
amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rmq.Error("Error opening connection", err)
return
}
amqpConnection.NotifyStatusChange(stateChanged)
defer func() {
err = env.CloseConnections(context.Background())
if err != nil {
rmq.Error("Error closing connection: %v\n", err)
}
close(stateChanged)
}()
rmq.Info("AMQP connection opened for consumer")
// Create management interface
management := amqpConnection.Management()
// Bind queue to exchange
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: deadExchangeName,
DestinationQueue: deadQueueName,
BindingKey: deadRoutingKey,
})
if err != nil {
rmq.Error("Error binding", err)
return
}
// Bind queue to exchange (idempotent operation)
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})
if err != nil {
rmq.Error("Error binding", err)
return
}
// use go func to contain this dead consumer
go func() {
conn, err := amqp.Dial("amqp://ecl3000:ecl3000@192.168.2.104:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// ch.ExchangeDeclare()
// use QueueDeclare func make sure the queue exists
q, err := ch.QueueDeclare(
deadQueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// ch.QueueBind()
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
for d := range msgs {
log.Printf("Received a message from DLQ: %s", d.Body)
}
}()
// Create consumer
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rmq.Error("Error creating consumer", err)
return
}
defer consumer.Close(context.Background())
consumerContext, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Start a goroutine to handle shutdown signals
go func() {
<-sigChan
rmq.Info("Received shutdown signal, stopping consumer...")
cancel()
}()
// Consume messages
rmq.Info("Consumer ready to receive messages")
for {
deliveryContext, err := consumer.Receive(consumerContext)
if errors.Is(err, context.Canceled) {
rmq.Info("[Consumer]", "consumer closed gracefully")
return
}
if err != nil {
rmq.Error("[Consumer]", "Error receiving message", err)
return
}
rmq.Info("[Consumer]", "Received message",
fmt.Sprintf("%s", deliveryContext.Message().Data))
err = deliveryContext.Accept(context.Background())
if err != nil {
rmq.Error("Error accepting message", err)
return
}
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}