package main import ( "context" "fmt" "time" rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) func main() { exchangeName := "getting-started-go-exchange" queueName := "getting-started-go-queue" routingKey := "routing-key" // Delcare Dead Letter Exchange deadExchangeName := "getting-started-go-dead-letter-exchange" deadQueueName := "getting-started-go-dead-letter-queue" deadRoutingKey := "dead-letter-routing-key" rmq.Info("Getting started with AMQP Go AMQP 1.0 Client") // Create a channel to receive connection state change notifications stateChanged := make(chan *rmq.StateChanged, 1) go func(ch chan *rmq.StateChanged) { for statusChanged := range ch { rmq.Info("[producer connection]", "Status changed", statusChanged) } }(stateChanged) env := rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil) // Open connection amqpConnection, err := env.NewConnection(context.Background()) if err != nil { rmq.Error("Error opening connection", err) return } amqpConnection.NotifyStatusChange(stateChanged) defer func() { err = env.CloseConnections(context.Background()) if err != nil { rmq.Error("Error closing connection: %v\n", err) } close(stateChanged) }() rmq.Info("AMQP connection opened for producer") // Create management interface management := amqpConnection.Management() _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ Name: deadExchangeName, }) if err != nil { rmq.Error("Error declaring dead letter exchange", err) return } _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ Name: deadQueueName, }) if err != nil { rmq.Error("Error declaring dead letter queue", err) return } // Bind queue to exchange _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ SourceExchange: deadExchangeName, DestinationQueue: deadQueueName, BindingKey: deadRoutingKey, }) if err != nil { rmq.Error("Error binding", err) return } // Declare exchange _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ Name: exchangeName, }) if err != nil { rmq.Error("Error declaring exchange", err) return } // Declare queue with DLX _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ Name: queueName, // add config of test DLX and DLQ MaxLength: 50, DeadLetterExchange: deadExchangeName, DeadLetterRoutingKey: deadRoutingKey, }) if err != nil { rmq.Error("Error declaring queue", err) return } // Bind queue to exchange _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ SourceExchange: exchangeName, DestinationQueue: queueName, BindingKey: routingKey, }) if err != nil { rmq.Error("Error binding", err) return } // Create publisher publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{ Exchange: exchangeName, Key: routingKey, }, nil) if err != nil { rmq.Error("Error creating publisher", err) return } defer publisher.Close(context.Background()) // Publish messages for i := range 100 { publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) if err != nil { rmq.Error("Error publishing message", "error", err) time.Sleep(1 * time.Second) continue } switch publishResult.Outcome.(type) { case *rmq.StateAccepted: rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) case *rmq.StateReleased: rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) case *rmq.StateRejected: rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) stateType := publishResult.Outcome.(*rmq.StateRejected) if stateType.Error != nil { rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) } default: rmq.Warn("Message state: %v", publishResult.Outcome) } } rmq.Info("producer finished sending messages") }