package main import ( "context" "crypto/tls" "crypto/x509" "encoding/pem" "fmt" "os" "time" "github.com/Azure/go-amqp" rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" "github.com/youmark/pkcs8" ) func main() { exchangeName := "getting-started-go-exchange" queueName := "getting-started-go-queue" routingKey := "routing-key" // Delcare Dead Letter Exchange deadExchangeName := "getting-started-go-dead-letter-exchange" deadQueueName := "getting-started-go-dead-letter-queue" deadRoutingKey := "dead-letter-routing-key" rmq.Info("Getting started with AMQP Go AMQP 1.0 Client") // Create a channel to receive connection state change notifications stateChanged := make(chan *rmq.StateChanged, 1) go func(ch chan *rmq.StateChanged) { for statusChanged := range ch { rmq.Info("[producer connection]", "Status changed", statusChanged) } }(stateChanged) // Load CA certificate caCert, err := os.ReadFile("../../certs/ca_certificate.pem") if err != nil { rmq.Error("read ca file failed", err) return } // Create a CA certificate pool and add the CA certificate to it caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) // Load client cert // 读取私钥文件 keyData, err := os.ReadFile("../../certs/client_key.pem") if err != nil { rmq.Error("read private key file failed", err) return } // 解码PEM编码的私钥 block, _ := pem.Decode(keyData) if block == nil || block.Type != "ENCRYPTED PRIVATE KEY" { rmq.Error("decode PEM block containing private key failed", err) return } // 使用密码解密 PKCS#8 私钥 password := []byte("ecl3000") privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) if err != nil { rmq.Error("parse private key failed by password failed", err) return } // 还原为 PEM 格式 pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) if err != nil { panic(err) } pemBlock := &pem.Block{ Type: "PRIVATE KEY", Bytes: pemBytes, } certPEM, err := os.ReadFile("../../certs/client_certificate.pem") if err != nil { rmq.Error("read cert file failed", err) return } clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock)) if err != nil { rmq.Error("load client cert failed", err) return } tlsConfig := &tls.Config{ Certificates: []tls.Certificate{clientCert}, RootCAs: caCertPool, InsecureSkipVerify: false, ServerName: "douxu-buntu22", // the server name should match the name on the certificate } // Setup environment with ssl env := rmq.NewEnvironment("amqps://192.168.2.104:5671/", &rmq.AmqpConnOptions{ SASLType: amqp.SASLTypeExternal(""), TLSConfig: tlsConfig, }) // env = rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil) // Open connection amqpConnection, err := env.NewConnection(context.Background()) if err != nil { rmq.Error("Error opening connection", err) return } amqpConnection.NotifyStatusChange(stateChanged) defer func() { err = env.CloseConnections(context.Background()) if err != nil { rmq.Error("Error closing connection: %v\n", err) } close(stateChanged) }() rmq.Info("AMQP connection opened for producer") // Create management interface management := amqpConnection.Management() _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ Name: deadExchangeName, }) if err != nil { rmq.Error("Error declaring dead letter exchange", err) return } _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ Name: deadQueueName, }) if err != nil { rmq.Error("Error declaring dead letter queue", err) return } // Bind queue to exchange _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ SourceExchange: deadExchangeName, DestinationQueue: deadQueueName, BindingKey: deadRoutingKey, }) if err != nil { rmq.Error("Error binding", err) return } // Declare exchange _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ Name: exchangeName, }) if err != nil { rmq.Error("Error declaring exchange", err) return } // Declare queue with DLX _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ Name: queueName, // add config of test DLX and DLQ MaxLength: 50, DeadLetterExchange: deadExchangeName, DeadLetterRoutingKey: deadRoutingKey, }) if err != nil { rmq.Error("Error declaring queue", err) return } // Bind queue to exchange _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ SourceExchange: exchangeName, DestinationQueue: queueName, BindingKey: routingKey, }) if err != nil { rmq.Error("Error binding", err) return } // Create publisher publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{ Exchange: exchangeName, Key: routingKey, }, nil) if err != nil { rmq.Error("Error creating publisher", err) return } defer publisher.Close(context.Background()) // Publish messages for i := range 100 { publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) if err != nil { rmq.Error("Error publishing message", "error", err) time.Sleep(1 * time.Second) continue } switch publishResult.Outcome.(type) { case *rmq.StateAccepted: rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) case *rmq.StateReleased: rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) case *rmq.StateRejected: rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) stateType := publishResult.Outcome.(*rmq.StateRejected) if stateType.Error != nil { rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) } default: rmq.Warn("Message state: %v", publishResult.Outcome) } } rmq.Info("producer finished sending messages") }