modelRT/mq/publisher_with_ssh_091.go

161 lines
4.1 KiB
Go

// Package mq provides read or write access to message queue services
package mq
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"log"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/youmark/pkcs8"
)
func main() {
exchangeName := "getting-started-go-exchange"
queueName := "getting-started-go-queue"
routingKey := "routing-key"
deadExchangeName := "getting-started-go-dead-letter-exchange"
deadQueueName := "getting-started-go-dead-letter-queue"
deadRoutingKey := "dead-letter-routing-key"
caCert, err := os.ReadFile("../certs/ca_certificate.pem")
if err != nil {
log.Fatal("read ca file failed", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
keyData, err := os.ReadFile("../certs/client_key.pem")
if err != nil {
log.Fatal("read private key file failed", err)
}
block, _ := pem.Decode(keyData)
password := []byte("ecl3000")
privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password)
if err != nil {
log.Fatal("parse private key failed", err)
}
pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
pemBlock := &pem.Block{Type: "PRIVATE KEY", Bytes: pemBytes}
certPEM, err := os.ReadFile("../certs/client_certificate.pem")
clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock))
if err != nil {
log.Fatal("load client cert failed", err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caCertPool,
InsecureSkipVerify: false,
ServerName: "douxu-buntu22",
}
url := "amqps://192.168.2.104:5671/"
conn, err := amqp.DialConfig(url, amqp.Config{
TLSClientConfig: tlsConfig,
SASL: []amqp.Authentication{&amqp.ExternalAuth{}},
Heartbeat: 10 * time.Second,
})
if err != nil {
log.Fatal("Error opening connection: ", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal("Error opening channel: ", err)
}
defer ch.Close()
go func() {
closeErr := <-conn.NotifyClose(make(chan *amqp.Error))
log.Printf("Connection closed: %v", closeErr)
}()
err = ch.ExchangeDeclare(deadExchangeName, "topic", true, false, false, false, nil)
if err != nil {
log.Fatal("Error declaring dead letter exchange: ", err)
}
_, err = ch.QueueDeclare(deadQueueName, true, false, false, false, nil)
if err != nil {
log.Fatal("Error declaring dead letter queue: ", err)
}
err = ch.QueueBind(deadQueueName, deadRoutingKey, deadExchangeName, false, nil)
if err != nil {
log.Fatal("Error binding dead letter: ", err)
}
err = ch.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil)
if err != nil {
log.Fatal("Error declaring exchange: ", err)
}
args := amqp.Table{
"x-max-length": int32(50),
"x-dead-letter-exchange": deadExchangeName,
"x-dead-letter-routing-key": deadRoutingKey,
}
_, err = ch.QueueDeclare(queueName, true, false, false, false, args)
if err != nil {
log.Fatal("Error declaring queue: ", err)
}
err = ch.QueueBind(queueName, routingKey, exchangeName, false, nil)
if err != nil {
log.Fatal("Error binding queue: ", err)
}
if err := ch.Confirm(false); err != nil {
log.Fatal("Channel could not be put into confirm mode: ", err)
}
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
for i := 0; i < 100; i++ {
msgBody := fmt.Sprintf("Hello, World!%d", i)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err = ch.PublishWithContext(ctx,
exchangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msgBody),
})
cancel()
if err != nil {
log.Printf("Error publishing message: %v", err)
time.Sleep(1 * time.Second)
continue
}
select {
case confirm := <-confirms:
if confirm.Ack {
log.Printf("[Publisher] Message %d accepted", i)
} else {
log.Printf("[Publisher] Message %d Nack (Rejected by RabbitMQ)", i)
}
case <-time.After(5 * time.Second):
log.Printf("[Publisher] Timeout waiting for confirm for message %d", i)
}
}
log.Println("Producer finished sending messages")
}