package main import ( "context" "fmt" rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) func main() { exchangeName := "getting-started-go-exchange" queueName := "getting-started-go-queue" routingKey := "routing-key" env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0) amqpConnection, err := env.NewConnection(context.Background()) if err != nil { rmq.Error("Error opening connection", err) return } // Create the management interface for the connection // so we can declare exchanges, queues, and bindings management := amqpConnection.Management() // TopicExchangeSpecification but can be also DirectExchangeSpecification/FanOutExchangeSpecification _, err = management.DeclareExchange(context.Background(), &rmq.TopicExchangeSpecification{ Name: exchangeName, }) if err != nil { rmq.Error("Error declaring exchange", err) return } // Declare a Quorum queue // QuorumQueueSpecification but can be also ClassicQueueSpecification, // AutoGeneratedQueueSpecification, and StreamQueueSpecification _, err = management.DeclareQueue(context.Background(), &rmq.QuorumQueueSpecification{ Name: queueName, }) if err != nil { rmq.Error("Error declaring queue", err) return } // Bind the queue to the exchange bindingPath, err := management.Bind(context.Background(), &rmq.ExchangeToQueueBindingSpecification{ SourceExchange: exchangeName, DestinationQueue: queueName, BindingKey: routingKey, }) if err != nil { rmq.Error("Error binding", err) return } publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{ Exchange: exchangeName, Key: routingKey, }, nil) if err != nil { rmq.Error("Error creating publisher", err) return } for i := 0; i < 10; i++ { publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte(fmt.Sprint("Hello AMQP 1.0 - id:", i)))) if err != nil { rmq.Error("Error publishing message", err) return } switch publishResult.Outcome.(type) { // publish result case *rmq.StateAccepted: rmq.Info("Message accepted", "message", publishResult.Message.GetData()) default: rmq.Error("Message not accepted", "outcome", publishResult.Outcome) } } consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) if err != nil { rmq.Error("Error creating consumer", err) return } for i := 0; i < 10; i++ { deliveryContext, err := consumer.Receive(context.Background()) if err != nil { rmq.Error("Error receiving message", err) return } rmq.Info("Received message", "message", deliveryContext.Message().GetData()) // Accept the message. Message will be removed from the queue err = deliveryContext.Accept(context.Background()) if err != nil { rmq.Error("Error accepting message", err) return } } // Close the publisher _ = publisher.Close(context.Background()) // Close the consumer _ = consumer.Close(context.Background()) // Delete binding _ = management.Unbind(context.Background(), bindingPath) // Delete the queue _ = management.DeleteQueue(context.Background(), queueName) // Delete the exchange _ = management.DeleteExchange(context.Background(), exchangeName) // Close the connection _ = amqpConnection.Close(context.Background()) // close the connection with env _ = env.CloseConnections(context.Background()) }