2025-02-17 22:49:12 +08:00
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
exchangeName := "getting-started-go-exchange"
|
|
|
|
|
queueName := "getting-started-go-queue"
|
|
|
|
|
routingKey := "routing-key"
|
|
|
|
|
|
2025-03-05 16:46:28 +08:00
|
|
|
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
|
2025-02-17 22:49:12 +08:00
|
|
|
|
|
|
|
|
// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
|
|
|
|
|
amqpConnection, err := env.NewConnection(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error opening connection", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create the management interface for the connection
|
|
|
|
|
// so we can declare exchanges, queues, and bindings
|
|
|
|
|
management := amqpConnection.Management()
|
|
|
|
|
// TopicExchangeSpecification but can be also DirectExchangeSpecification/FanOutExchangeSpecification
|
|
|
|
|
_, err = management.DeclareExchange(context.Background(), &rmq.TopicExchangeSpecification{
|
|
|
|
|
Name: exchangeName,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error declaring exchange", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Declare a Quorum queue
|
|
|
|
|
// QuorumQueueSpecification but can be also ClassicQueueSpecification,
|
|
|
|
|
// AutoGeneratedQueueSpecification, and StreamQueueSpecification
|
|
|
|
|
_, err = management.DeclareQueue(context.Background(), &rmq.QuorumQueueSpecification{
|
|
|
|
|
Name: queueName,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error declaring queue", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Bind the queue to the exchange
|
|
|
|
|
bindingPath, err := management.Bind(context.Background(), &rmq.ExchangeToQueueBindingSpecification{
|
|
|
|
|
SourceExchange: exchangeName,
|
|
|
|
|
DestinationQueue: queueName,
|
|
|
|
|
BindingKey: routingKey,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error binding", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
|
|
|
|
|
Exchange: exchangeName,
|
|
|
|
|
Key: routingKey,
|
2025-02-27 20:58:59 +08:00
|
|
|
}, nil)
|
2025-02-17 22:49:12 +08:00
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error creating publisher", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
publishResult, err := publisher.Publish(context.Background(),
|
|
|
|
|
rmq.NewMessage([]byte(fmt.Sprint("Hello AMQP 1.0 - id:", i))))
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error publishing message", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch publishResult.Outcome.(type) {
|
|
|
|
|
// publish result
|
|
|
|
|
case *rmq.StateAccepted:
|
|
|
|
|
rmq.Info("Message accepted", "message", publishResult.Message.GetData())
|
|
|
|
|
default:
|
|
|
|
|
rmq.Error("Message not accepted", "outcome", publishResult.Outcome)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error creating consumer", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
deliveryContext, err := consumer.Receive(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error receiving message", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
rmq.Info("Received message", "message", deliveryContext.Message().GetData())
|
|
|
|
|
// Accept the message. Message will be removed from the queue
|
|
|
|
|
err = deliveryContext.Accept(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
rmq.Error("Error accepting message", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close the publisher
|
|
|
|
|
_ = publisher.Close(context.Background())
|
|
|
|
|
|
|
|
|
|
// Close the consumer
|
|
|
|
|
_ = consumer.Close(context.Background())
|
|
|
|
|
|
|
|
|
|
// Delete binding
|
|
|
|
|
_ = management.Unbind(context.Background(), bindingPath)
|
|
|
|
|
|
|
|
|
|
// Delete the queue
|
|
|
|
|
_ = management.DeleteQueue(context.Background(), queueName)
|
|
|
|
|
|
|
|
|
|
// Delete the exchange
|
|
|
|
|
_ = management.DeleteExchange(context.Background(), exchangeName)
|
|
|
|
|
|
|
|
|
|
// Close the connection
|
|
|
|
|
_ = amqpConnection.Close(context.Background())
|
|
|
|
|
|
|
|
|
|
// close the connection with env
|
|
|
|
|
_ = env.CloseConnections(context.Background())
|
|
|
|
|
}
|