244 lines
6.1 KiB
Go
244 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/pem"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
|
|
amqp "github.com/Azure/go-amqp"
|
|
amqp091 "github.com/rabbitmq/amqp091-go"
|
|
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
|
|
"github.com/youmark/pkcs8"
|
|
)
|
|
|
|
func main() {
|
|
exchangeName := "getting-started-go-exchange"
|
|
queueName := "getting-started-go-queue"
|
|
routingKey := "routing-key"
|
|
|
|
// Delcare Dead Letter Exchange
|
|
deadExchangeName := "getting-started-go-dead-letter-exchange"
|
|
deadQueueName := "getting-started-go-dead-letter-queue"
|
|
deadRoutingKey := "dead-letter-routing-key"
|
|
|
|
rmq.Info("Starting AMQP Go AMQP 1.0 Consumer")
|
|
|
|
// Create a channel to receive connection state change notifications
|
|
stateChanged := make(chan *rmq.StateChanged, 1)
|
|
go func(ch chan *rmq.StateChanged) {
|
|
for statusChanged := range ch {
|
|
rmq.Info("[consumer connection]", "Status changed", statusChanged)
|
|
}
|
|
}(stateChanged)
|
|
|
|
// Load CA certificate
|
|
caCert, err := os.ReadFile("../../certs/ca_certificate.pem")
|
|
if err != nil {
|
|
rmq.Error("read ca file failed", err)
|
|
return
|
|
}
|
|
// Create a CA certificate pool and add the CA certificate to it
|
|
caCertPool := x509.NewCertPool()
|
|
caCertPool.AppendCertsFromPEM(caCert)
|
|
|
|
// Load client cert
|
|
// 读取私钥文件
|
|
keyData, err := os.ReadFile("../../certs/client_key.pem")
|
|
if err != nil {
|
|
rmq.Error("read private key file failed", err)
|
|
return
|
|
}
|
|
|
|
// 解码PEM编码的私钥
|
|
block, _ := pem.Decode(keyData)
|
|
if block == nil || block.Type != "ENCRYPTED PRIVATE KEY" {
|
|
rmq.Error("decode PEM block containing private key failed", err)
|
|
return
|
|
}
|
|
|
|
// 使用密码解密 PKCS#8 私钥
|
|
password := []byte("ecl3000")
|
|
privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password)
|
|
if err != nil {
|
|
rmq.Error("parse private key failed by password failed", err)
|
|
return
|
|
}
|
|
|
|
// 还原为 PEM 格式
|
|
pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
pemBlock := &pem.Block{
|
|
Type: "PRIVATE KEY",
|
|
Bytes: pemBytes,
|
|
}
|
|
|
|
certPEM, err := os.ReadFile("../../certs/client_certificate.pem")
|
|
if err != nil {
|
|
rmq.Error("read cert file failed", err)
|
|
return
|
|
}
|
|
|
|
clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock))
|
|
if err != nil {
|
|
rmq.Error("load client cert failed", err)
|
|
return
|
|
}
|
|
|
|
tlsConfig := &tls.Config{
|
|
Certificates: []tls.Certificate{clientCert},
|
|
RootCAs: caCertPool,
|
|
InsecureSkipVerify: false,
|
|
ServerName: "douxu-buntu22", // the server name should match the name on the certificate
|
|
}
|
|
|
|
// Setup environment with ssl
|
|
env := rmq.NewEnvironment("amqps://192.168.2.104:5671/", &rmq.AmqpConnOptions{
|
|
SASLType: amqp.SASLTypeExternal(""),
|
|
TLSConfig: tlsConfig,
|
|
})
|
|
|
|
// Open connection
|
|
amqpConnection, err := env.NewConnection(context.Background())
|
|
if err != nil {
|
|
rmq.Error("Error opening connection", err)
|
|
return
|
|
}
|
|
amqpConnection.NotifyStatusChange(stateChanged)
|
|
defer func() {
|
|
err = env.CloseConnections(context.Background())
|
|
if err != nil {
|
|
rmq.Error("Error closing connection: %v\n", err)
|
|
}
|
|
close(stateChanged)
|
|
}()
|
|
|
|
rmq.Info("AMQP connection opened for consumer")
|
|
|
|
// Create management interface
|
|
management := amqpConnection.Management()
|
|
|
|
// Bind queue to exchange
|
|
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
|
|
SourceExchange: deadExchangeName,
|
|
DestinationQueue: deadQueueName,
|
|
BindingKey: deadRoutingKey,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error binding", err)
|
|
return
|
|
}
|
|
|
|
// Bind queue to exchange (idempotent operation)
|
|
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
|
|
SourceExchange: exchangeName,
|
|
DestinationQueue: queueName,
|
|
BindingKey: routingKey,
|
|
})
|
|
if err != nil {
|
|
rmq.Error("Error binding", err)
|
|
return
|
|
}
|
|
|
|
// use go func to contain this dead consumer
|
|
go func() {
|
|
conn, err := amqp091.DialTLS_ExternalAuth("amqps://192.168.2.104:5671/", tlsConfig)
|
|
// conn, err := amqp091.Dial("amqp://ecl3000:ecl3000@192.168.2.104:5672/")
|
|
failOnError(err, "Failed to connect to RabbitMQ")
|
|
defer conn.Close()
|
|
|
|
ch, err := conn.Channel()
|
|
failOnError(err, "Failed to open a channel")
|
|
defer ch.Close()
|
|
|
|
// ch.ExchangeDeclare()
|
|
|
|
// use QueueDeclare func make sure the queue exists
|
|
q, err := ch.QueueDeclare(
|
|
deadQueueName, // name
|
|
true, // durable
|
|
false, // delete when unused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
failOnError(err, "Failed to declare a queue")
|
|
|
|
// ch.QueueBind()
|
|
|
|
msgs, err := ch.Consume(
|
|
q.Name, // queue
|
|
"", // consumer
|
|
false, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
failOnError(err, "Failed to register a consumer")
|
|
|
|
for d := range msgs {
|
|
log.Printf("Received a message from DLQ: %s", d.Body)
|
|
}
|
|
}()
|
|
|
|
// Create consumer
|
|
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
|
|
if err != nil {
|
|
rmq.Error("Error creating consumer", err)
|
|
return
|
|
}
|
|
defer consumer.Close(context.Background())
|
|
|
|
consumerContext, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Set up signal handling for graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
// Start a goroutine to handle shutdown signals
|
|
go func() {
|
|
<-sigChan
|
|
rmq.Info("Received shutdown signal, stopping consumer...")
|
|
cancel()
|
|
}()
|
|
|
|
// Consume messages
|
|
rmq.Info("Consumer ready to receive messages")
|
|
for {
|
|
deliveryContext, err := consumer.Receive(consumerContext)
|
|
if errors.Is(err, context.Canceled) {
|
|
rmq.Info("[Consumer]", "consumer closed gracefully")
|
|
return
|
|
}
|
|
if err != nil {
|
|
rmq.Error("[Consumer]", "Error receiving message", err)
|
|
return
|
|
}
|
|
|
|
rmq.Info("[Consumer]", "Received message",
|
|
fmt.Sprintf("%s", deliveryContext.Message().Data))
|
|
|
|
err = deliveryContext.Requeue(context.Background())
|
|
if err != nil {
|
|
rmq.Error("release message to queue", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func failOnError(err error, msg string) {
|
|
if err != nil {
|
|
log.Panicf("%s: %s", msg, err)
|
|
}
|
|
}
|