diff --git a/rabbitmq_example/certs/ca_certificate.pem b/rabbitmq_example/certs/ca_certificate.pem new file mode 100644 index 0000000..a652035 --- /dev/null +++ b/rabbitmq_example/certs/ca_certificate.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDhDCCAmygAwIBAgIUJrWtguJt6ld1kg4Hu2NNVjlLbLYwDQYJKoZIhvcNAQEL +BQAwSzE6MDgGA1UEAwwxVExTR2VuU2VsZlNpZ25lZFJvb3RDQSAyMDI1LTA3LTEx +VDE2OjE3OjE0LjIyNTE5NzENMAsGA1UEBwwEJCQkJDAeFw0yNTA3MTEwODE3MTRa +Fw0zNTA3MDkwODE3MTRaMEsxOjA4BgNVBAMMMVRMU0dlblNlbGZTaWduZWRSb290 +Q0EgMjAyNS0wNy0xMVQxNjoxNzoxNC4yMjUxOTcxDTALBgNVBAcMBCQkJCQwggEi +MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC9Jbt3oHOk9+/B+7T7YrPdJlFP +18AlUfckU2j/9wjv1EX4rQBCZNs9qsNUg1FqZEJFZFZCq6bPLTC8tn1vXyFG7M1i +0XEa+sZKFF+dZCAYv7Tceq2PA9tM1lb9X/mGuQS4rvJ9V1EWm51aCkhzdecB7Q9/ +ea6ERVCYFjjginfpHu4alsbKRbLods9F8rgnNb8eAxgrvv/aSKYovW60BrnNBADR +VHG99GrWow2enYqY/k8MeznD43PLf8PYbyRyRKzWqYfVVEUtkL8zfKWB+4QgJlW6 +cKYyDuHkQL//YeyrJfXq6eQLOgG7Xbumklve9bqT2BBw+eYQQIa+jnEt4C6JAgMB +AAGjYDBeMA8GA1UdEwEB/wQFMAMBAf8wCwYDVR0PBAQDAgEGMB0GA1UdDgQWBBSf +eU6kVuBrGJAc8WJj3CCuvf8LdzAfBgNVHSMEGDAWgBSfeU6kVuBrGJAc8WJj3CCu +vf8LdzANBgkqhkiG9w0BAQsFAAOCAQEAeEPAxtTh4twF/2xMSex7XjINMEaaz2q3 +9lP+wVM1Fi7nWnC37LXe+AbbPSWBnru8UtEwU4PMVRd6bUWRWjrYj9BYVgZ5TKh1 +aGDTaphUolOk65qSnwtTwbCsOp+jDYOL68XVQoijRFB7bi1366sqfk0ioqtTwfTT +ejae55bIMTEzS2ABe4aSNIUEXHAYvrggx8A6MBiKcl5hbmC3Oq9HvKuyNq4Zvi7Z +Uvu18gjAnp6XQqATmQr2U4OoEH0n3IyHMpaRdDIS3nnlcJHlA8Fwjbjzv/gHOMBF +B2mUeeIsjMnWHqDTdbsa+++akPuQPJOfkr+laHOqJldo6noc8fl+9g== +-----END CERTIFICATE----- diff --git a/rabbitmq_example/certs/client_certificate.pem b/rabbitmq_example/certs/client_certificate.pem new file mode 100644 index 0000000..083e727 --- /dev/null +++ b/rabbitmq_example/certs/client_certificate.pem @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIIDxzCCAq+gAwIBAgIBAjANBgkqhkiG9w0BAQsFADBLMTowOAYDVQQDDDFUTFNH +ZW5TZWxmU2lnbmVkUm9vdENBIDIwMjUtMDctMTFUMTY6MTc6MTQuMjI1MTk3MQ0w +CwYDVQQHDAQkJCQkMB4XDTI1MDcxMTA4MTcxNFoXDTM1MDcwOTA4MTcxNFowKTEW +MBQGA1UEAwwNZG91eHUtYnVudHUyMjEPMA0GA1UECgwGY2xpZW50MIIBIjANBgkq +hkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA0Ry7TJdZZv+h7oLnEXxWvEY24+cYw3EG +8mm901GPNrqVFgYrFq5BpMiLtyZzDDvevc37m3cKeC35LAYbY1Tkx5k4DB6epyW0 +9+jhxcTRWLS+5qVpS+qCzSljNn6w4DkF3XvQA0PGB6LYgfA5tbwlzL9rvuE03Pgi +WK8hRbO4SM8Yr/GcitrORE6zfqYd2PWf88MKf2RFML8hWdNqqvqTy2QJCPFIHtET +RrsQnVLr8vztr+SOtygRkR2FA2K1fr/akC/j1tImTmrBnbdA/Q9/5hhoBKXrW7l7 +ved/AZQhOaoI1h8HlxK3kuGc246YeoTk2UYlStAJC0PXrxStmkzddQIDAQABo4HX +MIHUMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMBMGA1UdJQQMMAoGCCsGAQUFBwMC +MDIGA1UdEQQrMCmCDWRvdXh1LWJ1bnR1MjKCDWRvdXh1LWJ1bnR1MjKCCWxvY2Fs +aG9zdDAxBgNVHR8EKjAoMCagJKAihiBodHRwOi8vY3JsLXNlcnZlcjo4MDAwL2Jh +c2ljLmNybDAdBgNVHQ4EFgQUhYsYNDlJQGZLsrwe26oRyH8TSbgwHwYDVR0jBBgw +FoAUn3lOpFbgaxiQHPFiY9wgrr3/C3cwDQYJKoZIhvcNAQELBQADggEBAKYDDeob +uwJOQPR7UxZ0+AA8IZIREQOr7ITkDcijhBHPH7+28J8XPuZzkrw+pCalc13qWy4J +WWqoLghGAyaBaNsGPDArwvLskxP3jRWC2g1JnTzvRtn212w69FIZVZoPFHgg0wb5 +R0ZCkRM6cwsFxyC79C3PTHckkZQj2F5LZ4RLdeeKK7zFEQ4wJjWRLRDDUxE4X3vL +NSFfcUlT0a/xJXcffjPFSMSBrnTUl7kQJ18QbqteKP3TaF6ao8jVOsxuwZyUj9// ++JnXqjnh+ImAgG+xLB+4Gqyi0WyFAUxrgiKv5XhroeSqZ/gUZFdJ0svHoVpPX2M+ +hsQvcqJ1+Njfvi0= +-----END CERTIFICATE----- diff --git a/rabbitmq_example/certs/client_key.pem b/rabbitmq_example/certs/client_key.pem new file mode 100644 index 0000000..f2d4930 --- /dev/null +++ b/rabbitmq_example/certs/client_key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQIj391srVKH0ECAggA +MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBDtwxP0c06BxuF0vA1biHEdBIIE +0IHNcyO40gbVxVZeaB5PuL6p8pFreRcQHPgarsdCkD9w+1I1zcVH2RArUUKgjes6 +m2uCSQVZQGsffln8s7vy/Tf/Z2KEPjig3mWpgl7C30jokmYzVu9mKr5ob7n3OCCy +XAQx61xUxr1mHKZtvxwSoljjLxx/B/FZnxr4MQffdbd0pwjKNfXGUM3wiDzjmCCF +XK08Uz0Speh/3+R9HLOEANT7pSAAUtjzNBSLU3q1YCTStBeD19vlpRRLYdp9jEk5 +/G8KlEsTSESHNHSLZsKHnsf+Scptq1W1SOCZGaTzSo44EX8dMv9pisFEgy8OqxmG +w8l9uzd4LAtGUNXPSpYQ2prFvR0h82Gwdr7ezI3OfENB1wrjeGyGyRQH/YUXhmQO +04SHmXD7s2D2r5DkvYFFbhb+HHHsYQbWKh8nMdQDNk3DGxse8RlHOVCybteVLHv4 +LfumIkOBeQ/P0z3OouX/8fadYBJvLh0z5pw6YQoDxEbRNgQSA/dbWPyX/X2KAaae +0pqC4d31GCmXX6gwWf+5L2xo8E9EjWs5Nzs2CcqcjDUbOhdBrtaJyRT8iE1tv/wQ +LAKooFRfTlcWEs9SPVeaUabQO9h3X7fWFFaMsl8VNE2Di5RO+wo4q36m5G62hDVL +YWa/09HuDm0KKfcXVGQdtnBfSUoAaTOCC2WXbtXi9QNhbnZ80BwRZUYQD6Sa4nfG +cn5l/86WTKV07/Iv+oSP+HQUVJ06r/zTuspGaQ5e9ua8JtbDJKv+zOcL/MQ7trt3 +iMJoftc8wrjdQp8ODlSobHHBVv4Vu+5O9Q11rQPvoj2d6sFuVQ6mkoygDqQsxEjd +Qz8jeh6m4B25Wh/+yRVKlJqLSQP6w4Nd7LokBkiEQAQoQVOE8Vx1zUTYG1AWvWya +RfTNiJYJeUSABr0MMgf2yMNYo3FDk8mifVUHJgqvkzm9BqgstuUh8u6aMjs/wP2s +YiSs+Kfvpes1VNb2BO7zM5aGxoMojBkbR9PRJ3wII87LNB0GJdC8LWHu97fGE6Ev +5lC2gw/QDyLSwN/lLndoUwXE7v82BSlxiZ+I7TMOh1Sc7ZIhr/lJzumuUbQt2fGo +umo4W9agxFWE4kbLtOgVeLFM+uq+nOkoYMjumHR++AIj7V+pJ8BACA2rKXvro9Gf +r/qLxl5c6ZDiLwTWqXwsmxmzQzpV4xeZbMYeGY46lAnrvSTyNhyZyorIjyR87+hK +yUHQoV4vaXNOQvWTeXG3c5a1Y0a0csk/WWCm0yE40Ouf+1wERoo2rIzO6uEmrb5f +h1mw9p0+23grc1ymWOCGcx5Kp/50weNgrGHlfQ3s/jmYK5JvRcCK3jveTneK7Bhc +HR7qpZlXuhU66Va+gPMvHChVdwBiSifZHipy+7+Mu2LsXSXtoa86VB1BRCnGKoMl +vDg9KjOtfJpvRhEZLdb2YOUC6BnfrUn4HS94Qh4GBOkqDBpR4OemxwFNcs/YFZvH +g90eyq0hvdCoo4oW7haXsElxmSescW9005lXMxEzKEnZDNrnd2JmrEY9NNqTtHt4 +nqfsEUfPKFHdus+RGwrha59zucIcPfXPl2r4vsdOBO2M96xrT6YA/JSY8aXoeovP +llpBL++l06IllcEtQQALypq0Fa33k9qE8/tooOBbjdQe +-----END ENCRYPTED PRIVATE KEY----- diff --git a/rabbitmq_example/docker-compose.yml b/rabbitmq_example/docker-compose.yml new file mode 100644 index 0000000..f6b8f04 --- /dev/null +++ b/rabbitmq_example/docker-compose.yml @@ -0,0 +1,41 @@ +version: "3.5" + +x-environment: + &default-ecl3000-environment + # These environment variables will be used by ecl3000. + # Rabbitmq settings + # RABBITMQ_USER: "${RABBITMQ_USER}" + # RABBITMQ_PASS: "${RABBITMQ_PASS}" + +x-volumes: + &default-ecl3000-volumes + # These volumens will be used by ecl3000. + +services: + ecl3000-rabbitmq: + container_name: rabbitmq + image: rabbitmq:4.1.1-management-alpine + env_file: + - rabbitmq.env + ports: + - "4369:4369" + - "5671:5671" + - "5672:5672" + - "15691:15691" + - "15692:15692" + - "25672:25672" + - "15671:15671" + - "15672:15672" + hostname: "ecl3000-rabbitmq" + volumes: + - ecl3000-rabbitmq-data:/var/lib/rabbitmq + - ./certs:/etc/rabbitmq/certs + - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro + networks: + - ecl3000 + +volumes: + ecl3000-rabbitmq-data: + +networks: + ecl3000: diff --git a/rabbitmq_example/docker-compose1.yml b/rabbitmq_example/docker-compose1.yml new file mode 100644 index 0000000..1a711c5 --- /dev/null +++ b/rabbitmq_example/docker-compose1.yml @@ -0,0 +1,44 @@ +version: "3.5" + +x-environment: + &default-ecl3000-environment + # These environment variables will be used by ecl3000. + # Rabbitmq settings + # RABBITMQ_USER: "${RABBITMQ_USER}" + # RABBITMQ_PASS: "${RABBITMQ_PASS}" + +x-volumes: + &default-ecl3000-volumes + # These volumens will be used by ecl3000. + +services: + ecl3000-rabbitmq: + container_name: rabbitmq + image: rabbitmq:4.1.1-management-alpine + ports: + - "4369:4369" + - "5671:5671" + - "5672:5672" + - "15691:15691" + - "15692:15692" + - "25672:25672" + - "15671:15671" + - "15672:15672" + environment: + RABBITMQ_ERLANG_COOKIE: "${RABBITMQ_ERLANG_COOKIE}" + RABBITMQ_DEFAULT_USER: "${RABBITMQ_DEFAULT_USER}" + RABBITMQ_DEFAULT_PASS: "${RABBITMQ_DEFAULT_PASS}" + RABBITMQ_DEFAULT_VHOST: "${RABBITMQ_DEFAULT_VHOST}" + hostname: "ecl3000-rabbitmq" + volumes: + - ecl3000-rabbitmq-data:/var/lib/rabbitmq + - ./certs:/etc/rabbitmq/certs + - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro + networks: + - ecl3000 + +volumes: + ecl3000-rabbitmq-data: + +networks: + ecl3000: diff --git a/rabbitmq_example/publisher/publisher.go b/rabbitmq_example/publisher/publisher.go new file mode 100644 index 0000000..f0c53f8 --- /dev/null +++ b/rabbitmq_example/publisher/publisher.go @@ -0,0 +1,148 @@ +package main + +import ( + "context" + "fmt" + "time" + + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +func main() { + exchangeName := "getting-started-go-exchange" + queueName := "getting-started-go-queue" + routingKey := "routing-key" + + // Delcare Dead Letter Exchange + deadExchangeName := "getting-started-go-dead-letter-exchange" + deadQueueName := "getting-started-go-dead-letter-queue" + deadRoutingKey := "dead-letter-routing-key" + + rmq.Info("Getting started with AMQP Go AMQP 1.0 Client") + + // Create a channel to receive connection state change notifications + stateChanged := make(chan *rmq.StateChanged, 1) + go func(ch chan *rmq.StateChanged) { + for statusChanged := range ch { + rmq.Info("[producer connection]", "Status changed", statusChanged) + } + }(stateChanged) + + env := rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil) + + // Open connection + amqpConnection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + amqpConnection.NotifyStatusChange(stateChanged) + defer func() { + err = env.CloseConnections(context.Background()) + if err != nil { + rmq.Error("Error closing connection: %v\n", err) + } + close(stateChanged) + }() + + rmq.Info("AMQP connection opened for producer") + + // Create management interface + management := amqpConnection.Management() + + _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ + Name: deadExchangeName, + }) + if err != nil { + rmq.Error("Error declaring dead letter exchange", err) + return + } + + _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ + Name: deadQueueName, + }) + if err != nil { + rmq.Error("Error declaring dead letter queue", err) + return + } + + // Bind queue to exchange + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: deadExchangeName, + DestinationQueue: deadQueueName, + BindingKey: deadRoutingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // Declare exchange + _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ + Name: exchangeName, + }) + if err != nil { + rmq.Error("Error declaring exchange", err) + return + } + + // Declare queue with DLX + _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ + Name: queueName, + // add config of test DLX and DLQ + MaxLength: 50, + DeadLetterExchange: deadExchangeName, + DeadLetterRoutingKey: deadRoutingKey, + }) + if err != nil { + rmq.Error("Error declaring queue", err) + return + } + + // Bind queue to exchange + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: routingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // Create publisher + publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{ + Exchange: exchangeName, + Key: routingKey, + }, nil) + if err != nil { + rmq.Error("Error creating publisher", err) + return + } + defer publisher.Close(context.Background()) + + // Publish messages + for i := range 100 { + publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) + if err != nil { + rmq.Error("Error publishing message", "error", err) + time.Sleep(1 * time.Second) + continue + } + switch publishResult.Outcome.(type) { + case *rmq.StateAccepted: + rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + case *rmq.StateReleased: + rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) + case *rmq.StateRejected: + rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*rmq.StateRejected) + if stateType.Error != nil { + rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) + } + default: + rmq.Warn("Message state: %v", publishResult.Outcome) + } + } + rmq.Info("producer finished sending messages") +} diff --git a/rabbitmq_example/publisher/ssl/publisher_with_ssl.go b/rabbitmq_example/publisher/ssl/publisher_with_ssl.go new file mode 100644 index 0000000..6e8ca2c --- /dev/null +++ b/rabbitmq_example/publisher/ssl/publisher_with_ssl.go @@ -0,0 +1,223 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "os" + "time" + + "github.com/Azure/go-amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" + "github.com/youmark/pkcs8" +) + +func main() { + exchangeName := "getting-started-go-exchange" + queueName := "getting-started-go-queue" + routingKey := "routing-key" + + // Delcare Dead Letter Exchange + deadExchangeName := "getting-started-go-dead-letter-exchange" + deadQueueName := "getting-started-go-dead-letter-queue" + deadRoutingKey := "dead-letter-routing-key" + + rmq.Info("Getting started with AMQP Go AMQP 1.0 Client") + + // Create a channel to receive connection state change notifications + stateChanged := make(chan *rmq.StateChanged, 1) + go func(ch chan *rmq.StateChanged) { + for statusChanged := range ch { + rmq.Info("[producer connection]", "Status changed", statusChanged) + } + }(stateChanged) + + // Load CA certificate + caCert, err := os.ReadFile("../../certs/ca_certificate.pem") + if err != nil { + rmq.Error("read ca file failed", err) + return + } + // Create a CA certificate pool and add the CA certificate to it + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Load client cert + // 读取私钥文件 + keyData, err := os.ReadFile("../../certs/client_key.pem") + if err != nil { + rmq.Error("read private key file failed", err) + return + } + + // 解码PEM编码的私钥 + block, _ := pem.Decode(keyData) + if block == nil || block.Type != "ENCRYPTED PRIVATE KEY" { + rmq.Error("decode PEM block containing private key failed", err) + return + } + + // 使用密码解密 PKCS#8 私钥 + password := []byte("ecl3000") + privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) + if err != nil { + rmq.Error("parse private key failed by password failed", err) + return + } + + // 还原为 PEM 格式 + pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + panic(err) + } + + pemBlock := &pem.Block{ + Type: "PRIVATE KEY", + Bytes: pemBytes, + } + + certPEM, err := os.ReadFile("../../certs/client_certificate.pem") + if err != nil { + rmq.Error("read cert file failed", err) + return + } + + clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock)) + if err != nil { + rmq.Error("load client cert failed", err) + return + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{clientCert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + ServerName: "douxu-buntu22", // the server name should match the name on the certificate + } + + // Setup environment with ssl + env := rmq.NewEnvironment("amqps://192.168.2.104:5671/", &rmq.AmqpConnOptions{ + SASLType: amqp.SASLTypeExternal(""), + TLSConfig: tlsConfig, + }) + + // env = rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil) + + // Open connection + amqpConnection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + amqpConnection.NotifyStatusChange(stateChanged) + defer func() { + err = env.CloseConnections(context.Background()) + if err != nil { + rmq.Error("Error closing connection: %v\n", err) + } + close(stateChanged) + }() + + rmq.Info("AMQP connection opened for producer") + + // Create management interface + management := amqpConnection.Management() + + _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ + Name: deadExchangeName, + }) + if err != nil { + rmq.Error("Error declaring dead letter exchange", err) + return + } + + _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ + Name: deadQueueName, + }) + if err != nil { + rmq.Error("Error declaring dead letter queue", err) + return + } + + // Bind queue to exchange + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: deadExchangeName, + DestinationQueue: deadQueueName, + BindingKey: deadRoutingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // Declare exchange + _, err = management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ + Name: exchangeName, + }) + if err != nil { + rmq.Error("Error declaring exchange", err) + return + } + + // Declare queue with DLX + _, err = management.DeclareQueue(context.TODO(), &rmq.ClassicQueueSpecification{ + Name: queueName, + // add config of test DLX and DLQ + MaxLength: 50, + DeadLetterExchange: deadExchangeName, + DeadLetterRoutingKey: deadRoutingKey, + }) + if err != nil { + rmq.Error("Error declaring queue", err) + return + } + + // Bind queue to exchange + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: routingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // Create publisher + publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{ + Exchange: exchangeName, + Key: routingKey, + }, nil) + if err != nil { + rmq.Error("Error creating publisher", err) + return + } + defer publisher.Close(context.Background()) + + // Publish messages + for i := range 100 { + publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) + if err != nil { + rmq.Error("Error publishing message", "error", err) + time.Sleep(1 * time.Second) + continue + } + switch publishResult.Outcome.(type) { + case *rmq.StateAccepted: + rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + case *rmq.StateReleased: + rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) + case *rmq.StateRejected: + rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*rmq.StateRejected) + if stateType.Error != nil { + rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) + } + default: + rmq.Warn("Message state: %v", publishResult.Outcome) + } + } + rmq.Info("producer finished sending messages") +} diff --git a/rabbitmq_example/rabbitmq.conf b/rabbitmq_example/rabbitmq.conf new file mode 100644 index 0000000..9293f89 --- /dev/null +++ b/rabbitmq_example/rabbitmq.conf @@ -0,0 +1,22 @@ +# 确保允许PLAIN认证 +auth_mechanisms.1 = PLAIN +auth_mechanisms.2 = AMQPLAIN + +# 检查默认vhost +# 允许admin用户通过远程方式连接 +loopback_users.admin = false + + +# disables non-TLS listeners, only TLS-enabled clients will be able to connect +# 开启此项配置会导致只能通过TLS端口访问 +listeners.tcp = none +# ssl config +listeners.ssl.default = 5671 +ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem +ssl_options.certfile = /etc/rabbitmq/certs/server_douxu-buntu22_certificate.pem +ssl_options.keyfile = /etc/rabbitmq/certs/server_douxu-buntu22_key.pem +# 启用双向认证 +ssl_options.verify = verify_peer +ssl_options.fail_if_no_peer_cert = true +# If the private key file is password protected, set this value +ssl_options.password = ecl3000 \ No newline at end of file diff --git a/rabbitmq_example/rabbitmq.env b/rabbitmq_example/rabbitmq.env new file mode 100644 index 0000000..11ea163 --- /dev/null +++ b/rabbitmq_example/rabbitmq.env @@ -0,0 +1,5 @@ +# Taiga's RabbitMQ settings - Variables to leave messages for the realtime and asynchronous events +RABBITMQ_DEFAULT_USER=ecl3000 # user to connect to RabbitMQ +RABBITMQ_DEFAULT_PASS=ecl3000 # RabbitMQ user's password +RABBITMQ_DEFAULT_VHOST=/ +RABBITMQ_ERLANG_COOKIE=jessequ-secret-erlang-cookie # unique value shared by any connected instance of RabbitMQ \ No newline at end of file diff --git a/rabbitmq_example/receiver/message_release/receiver_with_release.go b/rabbitmq_example/receiver/message_release/receiver_with_release.go new file mode 100644 index 0000000..8391671 --- /dev/null +++ b/rabbitmq_example/receiver/message_release/receiver_with_release.go @@ -0,0 +1,243 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + amqp "github.com/Azure/go-amqp" + amqp091 "github.com/rabbitmq/amqp091-go" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" + "github.com/youmark/pkcs8" +) + +func main() { + exchangeName := "getting-started-go-exchange" + queueName := "getting-started-go-queue" + routingKey := "routing-key" + + // Delcare Dead Letter Exchange + deadExchangeName := "getting-started-go-dead-letter-exchange" + deadQueueName := "getting-started-go-dead-letter-queue" + deadRoutingKey := "dead-letter-routing-key" + + rmq.Info("Starting AMQP Go AMQP 1.0 Consumer") + + // Create a channel to receive connection state change notifications + stateChanged := make(chan *rmq.StateChanged, 1) + go func(ch chan *rmq.StateChanged) { + for statusChanged := range ch { + rmq.Info("[consumer connection]", "Status changed", statusChanged) + } + }(stateChanged) + + // Load CA certificate + caCert, err := os.ReadFile("../../certs/ca_certificate.pem") + if err != nil { + rmq.Error("read ca file failed", err) + return + } + // Create a CA certificate pool and add the CA certificate to it + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Load client cert + // 读取私钥文件 + keyData, err := os.ReadFile("../../certs/client_key.pem") + if err != nil { + rmq.Error("read private key file failed", err) + return + } + + // 解码PEM编码的私钥 + block, _ := pem.Decode(keyData) + if block == nil || block.Type != "ENCRYPTED PRIVATE KEY" { + rmq.Error("decode PEM block containing private key failed", err) + return + } + + // 使用密码解密 PKCS#8 私钥 + password := []byte("ecl3000") + privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) + if err != nil { + rmq.Error("parse private key failed by password failed", err) + return + } + + // 还原为 PEM 格式 + pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + panic(err) + } + + pemBlock := &pem.Block{ + Type: "PRIVATE KEY", + Bytes: pemBytes, + } + + certPEM, err := os.ReadFile("../../certs/client_certificate.pem") + if err != nil { + rmq.Error("read cert file failed", err) + return + } + + clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock)) + if err != nil { + rmq.Error("load client cert failed", err) + return + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{clientCert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + ServerName: "douxu-buntu22", // the server name should match the name on the certificate + } + + // Setup environment with ssl + env := rmq.NewEnvironment("amqps://192.168.2.104:5671/", &rmq.AmqpConnOptions{ + SASLType: amqp.SASLTypeExternal(""), + TLSConfig: tlsConfig, + }) + + // Open connection + amqpConnection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + amqpConnection.NotifyStatusChange(stateChanged) + defer func() { + err = env.CloseConnections(context.Background()) + if err != nil { + rmq.Error("Error closing connection: %v\n", err) + } + close(stateChanged) + }() + + rmq.Info("AMQP connection opened for consumer") + + // Create management interface + management := amqpConnection.Management() + + // Bind queue to exchange + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: deadExchangeName, + DestinationQueue: deadQueueName, + BindingKey: deadRoutingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // Bind queue to exchange (idempotent operation) + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: routingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // use go func to contain this dead consumer + go func() { + conn, err := amqp091.DialTLS_ExternalAuth("amqps://192.168.2.104:5671/", tlsConfig) + // conn, err := amqp091.Dial("amqp://ecl3000:ecl3000@192.168.2.104:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + // ch.ExchangeDeclare() + + // use QueueDeclare func make sure the queue exists + q, err := ch.QueueDeclare( + deadQueueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + // ch.QueueBind() + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + for d := range msgs { + log.Printf("Received a message from DLQ: %s", d.Body) + } + }() + + // Create consumer + consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) + if err != nil { + rmq.Error("Error creating consumer", err) + return + } + defer consumer.Close(context.Background()) + + consumerContext, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + // Start a goroutine to handle shutdown signals + go func() { + <-sigChan + rmq.Info("Received shutdown signal, stopping consumer...") + cancel() + }() + + // Consume messages + rmq.Info("Consumer ready to receive messages") + for { + deliveryContext, err := consumer.Receive(consumerContext) + if errors.Is(err, context.Canceled) { + rmq.Info("[Consumer]", "consumer closed gracefully") + return + } + if err != nil { + rmq.Error("[Consumer]", "Error receiving message", err) + return + } + + rmq.Info("[Consumer]", "Received message", + fmt.Sprintf("%s", deliveryContext.Message().Data)) + + err = deliveryContext.Requeue(context.Background()) + if err != nil { + rmq.Error("release message to queue", err) + return + } + } +} + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} diff --git a/rabbitmq_example/receiver/receiver.go b/rabbitmq_example/receiver/receiver.go new file mode 100644 index 0000000..fb48515 --- /dev/null +++ b/rabbitmq_example/receiver/receiver.go @@ -0,0 +1,171 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + amqp "github.com/rabbitmq/amqp091-go" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" +) + +func main() { + exchangeName := "getting-started-go-exchange" + queueName := "getting-started-go-queue" + routingKey := "routing-key" + + // Delcare Dead Letter Exchange + deadExchangeName := "getting-started-go-dead-letter-exchange" + deadQueueName := "getting-started-go-dead-letter-queue" + deadRoutingKey := "dead-letter-routing-key" + + rmq.Info("Starting AMQP Go AMQP 1.0 Consumer") + + // Create a channel to receive connection state change notifications + stateChanged := make(chan *rmq.StateChanged, 1) + go func(ch chan *rmq.StateChanged) { + for statusChanged := range ch { + rmq.Info("[consumer connection]", "Status changed", statusChanged) + } + }(stateChanged) + + // Setup environment + env := rmq.NewEnvironment("amqp://ecl3000:ecl3000@192.168.2.104:5672/", nil) + + // Open connection + amqpConnection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + amqpConnection.NotifyStatusChange(stateChanged) + defer func() { + err = env.CloseConnections(context.Background()) + if err != nil { + rmq.Error("Error closing connection: %v\n", err) + } + close(stateChanged) + }() + + rmq.Info("AMQP connection opened for consumer") + + // Create management interface + management := amqpConnection.Management() + + // Bind queue to exchange + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: deadExchangeName, + DestinationQueue: deadQueueName, + BindingKey: deadRoutingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // Bind queue to exchange (idempotent operation) + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: routingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // use go func to contain this dead consumer + go func() { + conn, err := amqp.Dial("amqp://ecl3000:ecl3000@192.168.2.104:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + // ch.ExchangeDeclare() + + // use QueueDeclare func make sure the queue exists + q, err := ch.QueueDeclare( + deadQueueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + // ch.QueueBind() + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + for d := range msgs { + log.Printf("Received a message from DLQ: %s", d.Body) + } + }() + + // Create consumer + consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) + if err != nil { + rmq.Error("Error creating consumer", err) + return + } + defer consumer.Close(context.Background()) + + consumerContext, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + // Start a goroutine to handle shutdown signals + go func() { + <-sigChan + rmq.Info("Received shutdown signal, stopping consumer...") + cancel() + }() + + // Consume messages + rmq.Info("Consumer ready to receive messages") + for { + deliveryContext, err := consumer.Receive(consumerContext) + if errors.Is(err, context.Canceled) { + rmq.Info("[Consumer]", "consumer closed gracefully") + return + } + if err != nil { + rmq.Error("[Consumer]", "Error receiving message", err) + return + } + + rmq.Info("[Consumer]", "Received message", + fmt.Sprintf("%s", deliveryContext.Message().Data)) + + err = deliveryContext.Accept(context.Background()) + if err != nil { + rmq.Error("Error accepting message", err) + return + } + } +} + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} diff --git a/rabbitmq_example/receiver/ssl/receiver.go b/rabbitmq_example/receiver/ssl/receiver.go new file mode 100644 index 0000000..f92163f --- /dev/null +++ b/rabbitmq_example/receiver/ssl/receiver.go @@ -0,0 +1,243 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + amqp "github.com/Azure/go-amqp" + amqp091 "github.com/rabbitmq/amqp091-go" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" + "github.com/youmark/pkcs8" +) + +func main() { + exchangeName := "getting-started-go-exchange" + queueName := "getting-started-go-queue" + routingKey := "routing-key" + + // Delcare Dead Letter Exchange + deadExchangeName := "getting-started-go-dead-letter-exchange" + deadQueueName := "getting-started-go-dead-letter-queue" + deadRoutingKey := "dead-letter-routing-key" + + rmq.Info("Starting AMQP Go AMQP 1.0 Consumer") + + // Create a channel to receive connection state change notifications + stateChanged := make(chan *rmq.StateChanged, 1) + go func(ch chan *rmq.StateChanged) { + for statusChanged := range ch { + rmq.Info("[consumer connection]", "Status changed", statusChanged) + } + }(stateChanged) + + // Load CA certificate + caCert, err := os.ReadFile("../../certs/ca_certificate.pem") + if err != nil { + rmq.Error("read ca file failed", err) + return + } + // Create a CA certificate pool and add the CA certificate to it + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Load client cert + // 读取私钥文件 + keyData, err := os.ReadFile("../../certs/client_key.pem") + if err != nil { + rmq.Error("read private key file failed", err) + return + } + + // 解码PEM编码的私钥 + block, _ := pem.Decode(keyData) + if block == nil || block.Type != "ENCRYPTED PRIVATE KEY" { + rmq.Error("decode PEM block containing private key failed", err) + return + } + + // 使用密码解密 PKCS#8 私钥 + password := []byte("ecl3000") + privateKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, password) + if err != nil { + rmq.Error("parse private key failed by password failed", err) + return + } + + // 还原为 PEM 格式 + pemBytes, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + panic(err) + } + + pemBlock := &pem.Block{ + Type: "PRIVATE KEY", + Bytes: pemBytes, + } + + certPEM, err := os.ReadFile("../../certs/client_certificate.pem") + if err != nil { + rmq.Error("read cert file failed", err) + return + } + + clientCert, err := tls.X509KeyPair(certPEM, pem.EncodeToMemory(pemBlock)) + if err != nil { + rmq.Error("load client cert failed", err) + return + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{clientCert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + ServerName: "douxu-buntu22", // the server name should match the name on the certificate + } + + // Setup environment with ssl + env := rmq.NewEnvironment("amqps://192.168.2.104:5671/", &rmq.AmqpConnOptions{ + SASLType: amqp.SASLTypeExternal(""), + TLSConfig: tlsConfig, + }) + + // Open connection + amqpConnection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + amqpConnection.NotifyStatusChange(stateChanged) + defer func() { + err = env.CloseConnections(context.Background()) + if err != nil { + rmq.Error("Error closing connection: %v\n", err) + } + close(stateChanged) + }() + + rmq.Info("AMQP connection opened for consumer") + + // Create management interface + management := amqpConnection.Management() + + // Bind queue to exchange + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: deadExchangeName, + DestinationQueue: deadQueueName, + BindingKey: deadRoutingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // Bind queue to exchange (idempotent operation) + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: routingKey, + }) + if err != nil { + rmq.Error("Error binding", err) + return + } + + // use go func to contain this dead consumer + go func() { + conn, err := amqp091.DialTLS_ExternalAuth("amqps://192.168.2.104:5671/", tlsConfig) + // conn, err := amqp091.Dial("amqp://ecl3000:ecl3000@192.168.2.104:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + // ch.ExchangeDeclare() + + // use QueueDeclare func make sure the queue exists + q, err := ch.QueueDeclare( + deadQueueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + // ch.QueueBind() + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + for d := range msgs { + log.Printf("Received a message from DLQ: %s", d.Body) + } + }() + + // Create consumer + consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) + if err != nil { + rmq.Error("Error creating consumer", err) + return + } + defer consumer.Close(context.Background()) + + consumerContext, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + // Start a goroutine to handle shutdown signals + go func() { + <-sigChan + rmq.Info("Received shutdown signal, stopping consumer...") + cancel() + }() + + // Consume messages + rmq.Info("Consumer ready to receive messages") + for { + deliveryContext, err := consumer.Receive(consumerContext) + if errors.Is(err, context.Canceled) { + rmq.Info("[Consumer]", "consumer closed gracefully") + return + } + if err != nil { + rmq.Error("[Consumer]", "Error receiving message", err) + return + } + + rmq.Info("[Consumer]", "Received message", + fmt.Sprintf("%s", deliveryContext.Message().Data)) + + err = deliveryContext.Accept(context.Background()) + if err != nil { + rmq.Error("Error accepting message", err) + return + } + } +} + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +}