149 lines
4.0 KiB
Go
149 lines
4.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
|
)
|
|
|
|
func main() {
|
|
exchangeName := "getting-started-go-exchange"
|
|
queueName := "getting-started-go-queue"
|
|
routingKey := "routing-key"
|
|
|
|
// Delcare Dead Letter Exchange
|
|
deadExchangeName := "getting-started-go-dead-letter-exchange"
|
|
deadQueueName := "getting-started-go-dead-letter-queue"
|
|
deadRoutingKey := "dead-letter-routing-key"
|
|
|
|
rmq.Info("Getting started with AMQP Go AMQP 1.0 Client")
|
|
|
|
// Create a channel to receive connection state change notifications
|
|
stateChanged := make(chan *rmq.StateChanged, 1)
|
|
go func(ch chan *rmq.StateChanged) {
|
|
for statusChanged := range ch {
|
|
rmq.Info("[producer connection]", "Status changed", statusChanged)
|
|
}
|
|
}(stateChanged)
|
|
|
|
env := rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil)
|
|
|
|
// Open connection
|
|
amqpConnection, err := env.NewConnection(context.Background())
|
|
if err != nil {
|
|
rmq.Error("Error opening connection", err)
|
|
return
|
|
}
|
|
amqpConnection.NotifyStatusChange(stateChanged)
|
|
defer func() {
|
|
err = env.CloseConnections(context.Background())
|
|
if err != nil {
|
|
rmq.Error("Error closing connection: %v\n", err)
|
|
}
|
|
close(stateChanged)
|
|
}()
|
|
|
|
rmq.Info("AMQP connection opened for producer")
|
|
|
|
// Create management interface
|
|
management := amqpConnection.Management()
|
|
|
|
_, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
|
|
Name: deadExchangeName,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error declaring dead letter exchange", err)
|
|
return
|
|
}
|
|
|
|
_, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{
|
|
Name: deadQueueName,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error declaring dead letter queue", err)
|
|
return
|
|
}
|
|
|
|
// Bind queue to exchange
|
|
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
|
|
SourceExchange: deadExchangeName,
|
|
DestinationQueue: deadQueueName,
|
|
BindingKey: deadRoutingKey,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error binding", err)
|
|
return
|
|
}
|
|
|
|
// Declare exchange
|
|
_, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
|
|
Name: exchangeName,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error declaring exchange", err)
|
|
return
|
|
}
|
|
|
|
// Declare queue with DLX
|
|
_, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{
|
|
Name: queueName,
|
|
// add config of test DLX and DLQ
|
|
MaxLength: 50,
|
|
DeadLetterExchange: deadExchangeName,
|
|
DeadLetterRoutingKey: deadRoutingKey,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error declaring queue", err)
|
|
return
|
|
}
|
|
|
|
// Bind queue to exchange
|
|
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
|
|
SourceExchange: exchangeName,
|
|
DestinationQueue: queueName,
|
|
BindingKey: routingKey,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error binding", err)
|
|
return
|
|
}
|
|
|
|
// Create publisher
|
|
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
|
|
Exchange: exchangeName,
|
|
Key: routingKey,
|
|
}, nil)
|
|
if err != nil {
|
|
rmq.Error("Error creating publisher", err)
|
|
return
|
|
}
|
|
defer publisher.Close(context.Background())
|
|
|
|
// Publish messages
|
|
for i := range 100 {
|
|
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
|
|
if err != nil {
|
|
rmq.Error("Error publishing message", "error", err)
|
|
time.Sleep(1 * time.Second)
|
|
continue
|
|
}
|
|
switch publishResult.Outcome.(type) {
|
|
case *rmq.StateAccepted:
|
|
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
|
|
case *rmq.StateReleased:
|
|
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
|
|
case *rmq.StateRejected:
|
|
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
|
|
stateType := publishResult.Outcome.(*rmq.StateRejected)
|
|
if stateType.Error != nil {
|
|
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
|
|
}
|
|
default:
|
|
rmq.Warn("Message state: %v", publishResult.Outcome)
|
|
}
|
|
}
|
|
rmq.Info("producer finished sending messages")
|
|
}
|