Broadcast to tmq queues example (#43)
* broadcast example --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
This commit is contained in:
parent
d84c3d22de
commit
f52c7983ce
|
|
@ -8,4 +8,5 @@
|
||||||
- [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties.
|
- [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties.
|
||||||
- [Video](video) - From the YouTube tutorial [AMQP 1.0 with Golang](https://youtu.be/iR1JUFh3udI)
|
- [Video](video) - From the YouTube tutorial [AMQP 1.0 with Golang](https://youtu.be/iR1JUFh3udI)
|
||||||
- [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client.
|
- [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client.
|
||||||
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
|
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
|
||||||
|
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
|
||||||
|
|
@ -0,0 +1,110 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
|
||||||
|
|
||||||
|
// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
|
||||||
|
amqpConnection, err := env.NewConnection(context.Background())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
rmq.Error("Error opening connection", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const broadcastExchange = "broadcast"
|
||||||
|
// Create the management interface for the connection
|
||||||
|
// so we can declare exchanges, queues, and bindings
|
||||||
|
management := amqpConnection.Management()
|
||||||
|
_, err = management.DeclareExchange(context.Background(), &rmq.FanOutExchangeSpecification{
|
||||||
|
Name: broadcastExchange,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
rmq.Error("Error declaring exchange", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
// create temp queues
|
||||||
|
q, err := management.DeclareQueue(context.Background(), &rmq.AutoGeneratedQueueSpecification{
|
||||||
|
IsAutoDelete: true,
|
||||||
|
IsExclusive: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
rmq.Error("Error DeclareQueue", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
|
||||||
|
SourceExchange: broadcastExchange,
|
||||||
|
DestinationQueue: q.Name(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
rmq.Error("Error binding", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(idx int) {
|
||||||
|
consumer, err := amqpConnection.NewConsumer(context.Background(), q.Name(), nil)
|
||||||
|
if err != nil {
|
||||||
|
rmq.Error("Error creating consumer", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
dcx, err1 := consumer.Receive(context.Background())
|
||||||
|
if err1 != nil {
|
||||||
|
rmq.Error("Error receiving message", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rmq.Info("[Consumer]", "index", idx, "msg", fmt.Sprintf("%s", dcx.Message().Data), "[queue]", q.Name())
|
||||||
|
err1 = dcx.Accept(context.Background())
|
||||||
|
if err1 != nil {
|
||||||
|
rmq.Error("Error accepting message", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
|
||||||
|
Exchange: broadcastExchange,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
rmq.Error("Error creating publisher", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10_000; i++ {
|
||||||
|
publishResult, err := publisher.Publish(context.Background(),
|
||||||
|
rmq.NewMessage([]byte("Hello AMQP 1.0 - id:"+fmt.Sprintf("%d", i))))
|
||||||
|
if err != nil {
|
||||||
|
rmq.Error("Error publishing message", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch publishResult.Outcome.(type) {
|
||||||
|
// publish result
|
||||||
|
case *rmq.StateAccepted:
|
||||||
|
rmq.Info("[Publisher] Message accepted", "message", publishResult.Message.GetData())
|
||||||
|
default:
|
||||||
|
rmq.Error("[Publisher] Message not accepted", "outcome", publishResult.Outcome)
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// press any key to close the connection
|
||||||
|
|
||||||
|
var input string
|
||||||
|
_, _ = fmt.Scanln(&input)
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue