golang-demo/rabbitmq_example/publisher/ssl/publisher_with_ssl.go

224 lines
5.9 KiB
Go

package main
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"time"
"github.com/Azure/go-amqp"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
"github.com/youmark/pkcs8"
)
func main() {
exchangeName := "getting-started-go-exchange"
queueName := "getting-started-go-queue"
routingKey := "routing-key"
// Delcare Dead Letter Exchange
deadExchangeName := "getting-started-go-dead-letter-exchange"
deadQueueName := "getting-started-go-dead-letter-queue"
deadRoutingKey := "dead-letter-routing-key"
rmq.Info("Getting started with AMQP Go AMQP 1.0 Client")
// Create a channel to receive connection state change notifications
stateChanged := make(chan *rmq.StateChanged, 1)
go func(ch chan *rmq.StateChanged) {
for statusChanged := range ch {
rmq.Info("[producer connection]", "Status changed", statusChanged)
}
}(stateChanged)
// Load CA certificate
caCert, err := os.ReadFile("../../certs/ca_certificate.pem")
if err != nil {
rmq.Error("read ca file failed", err)
return
}
// Create a CA certificate pool and add the CA certificate to it
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// Load client cert
// 读取私钥文件
keyData, err := os.ReadFile("../../certs/client_key.pem")
if err != nil {
rmq.Error("read private key file failed", err)
return
}
// 解码PEM编码的私钥
block, _ := pem.Decode(keyData)
if block == nil || block.Type != "ENCRYPTED PRIVATE KEY" {
rmq.Error("decode PEM block containing private key failed", err)
return
}
// 使用密码解密 PKCS#8 私钥
password := []byte("ecl3000")
privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password)
if err != nil {
rmq.Error("parse private key failed by password failed", err)
return
}
// 还原为 PEM 格式
pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
if err != nil {
panic(err)
}
pemBlock := &pem.Block{
Type: "PRIVATE KEY",
Bytes: pemBytes,
}
certPEM, err := os.ReadFile("../../certs/client_certificate.pem")
if err != nil {
rmq.Error("read cert file failed", err)
return
}
clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock))
if err != nil {
rmq.Error("load client cert failed", err)
return
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caCertPool,
InsecureSkipVerify: false,
ServerName: "douxu-buntu22", // the server name should match the name on the certificate
}
// Setup environment with ssl
env := rmq.NewEnvironment("amqps://192.168.2.104:5671/", &rmq.AmqpConnOptions{
SASLType: amqp.SASLTypeExternal(""),
TLSConfig: tlsConfig,
})
// env = rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil)
// Open connection
amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rmq.Error("Error opening connection", err)
return
}
amqpConnection.NotifyStatusChange(stateChanged)
defer func() {
err = env.CloseConnections(context.Background())
if err != nil {
rmq.Error("Error closing connection: %v\n", err)
}
close(stateChanged)
}()
rmq.Info("AMQP connection opened for producer")
// Create management interface
management := amqpConnection.Management()
_, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
Name: deadExchangeName,
})
if err != nil {
rmq.Error("Error declaring dead letter exchange", err)
return
}
_, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{
Name: deadQueueName,
})
if err != nil {
rmq.Error("Error declaring dead letter queue", err)
return
}
// Bind queue to exchange
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: deadExchangeName,
DestinationQueue: deadQueueName,
BindingKey: deadRoutingKey,
})
if err != nil {
rmq.Error("Error binding", err)
return
}
// Declare exchange
_, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
Name: exchangeName,
})
if err != nil {
rmq.Error("Error declaring exchange", err)
return
}
// Declare queue with DLX
_, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{
Name: queueName,
// add config of test DLX and DLQ
MaxLength: 50,
DeadLetterExchange: deadExchangeName,
DeadLetterRoutingKey: deadRoutingKey,
})
if err != nil {
rmq.Error("Error declaring queue", err)
return
}
// Bind queue to exchange
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})
if err != nil {
rmq.Error("Error binding", err)
return
}
// Create publisher
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: exchangeName,
Key: routingKey,
}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return
}
defer publisher.Close(context.Background())
// Publish messages
for i := range 100 {
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
rmq.Error("Error publishing message", "error", err)
time.Sleep(1 * time.Second)
continue
}
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
case *rmq.StateReleased:
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
case *rmq.StateRejected:
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
default:
rmq.Warn("Message state: %v", publishResult.Outcome)
}
}
rmq.Info("producer finished sending messages")
}