diff --git a/.ci/certs/ca_certificate.pem b/.ci/certs/ca_certificate.pem index f1ae0b2..a67df16 100644 --- a/.ci/certs/ca_certificate.pem +++ b/.ci/certs/ca_certificate.pem @@ -1,21 +1,21 @@ -----BEGIN CERTIFICATE----- -MIIDhjCCAm6gAwIBAgIUJ2lTbiccSFtA9+8eGPQD5yGJ7w8wDQYJKoZIhvcNAQEL -BQAwTDE7MDkGA1UEAwwyVExTR2VuU2VsZlNpZ25lZHRSb290Q0EgMjAyMy0xMC0w -OFQwODoxNjowMy41OTA0NTQxDTALBgNVBAcMBCQkJCQwHhcNMjMxMDA4MTUxNjAz -WhcNMzMxMDA1MTUxNjAzWjBMMTswOQYDVQQDDDJUTFNHZW5TZWxmU2lnbmVkdFJv -b3RDQSAyMDIzLTEwLTA4VDA4OjE2OjAzLjU5MDQ1NDENMAsGA1UEBwwEJCQkJDCC -ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANdiiGj37094gAHfVpbIQHfu -ccBVozpexrYjDCbjw4IyJJOajJRNGbYZwEt3Jt5NaDc+zyoBZpKaZWDEjOxbNYkd -MtIHyFW4V4ooA6pySR9pzMI91dXoCkzL9Ex23Zrj0KF70qBQuPTbF5bnAbMELFuv -quFnfMw2ALsFrWh2DOwnMlt1hbdj6Iapl2yRGhVSgsr72SK+67b+b7WH02VGDrfm -Y3qqx3xAI6woKSE2Ot14Csak/iR1xit68X5GhzvSdOos0Yo3I4v8mlFEO+kpKWB0 -7y3Hb5AU/hqvSOwLRA+CV09bxN4N5rOfFHkPVuVMXQzX9mLCxzxroZn/sQzkrtMC -AwEAAaNgMF4wDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAQYwHQYDVR0OBBYE -FNSsn21DVr1XhhqmU+wMnLWFZc55MB8GA1UdIwQYMBaAFNSsn21DVr1XhhqmU+wM -nLWFZc55MA0GCSqGSIb3DQEBCwUAA4IBAQDRc1mAERvR1VPOaMevcB2pGaQfRLkN -fYgiO7rxd77FIQLieCbNNZ6z/fDRkBjgZ9xzAWl0vFJ0xZ0FSdUtAXEa9ES7r7eq -XOSW/5CRPeib4nMDeQvTSiCx5QqlIz4oUwW9bbpPcBQXM0IVZwo1Jbye/BGrjAmQ -Z3a5ph0f85Shjy2Yt9JB9BDCWOK8EU294CiKMUvdtQwSaQpl8GQfmvzWKAL4encu -ryEAPTDT9zuQi2bOCDY5QMwVNS6mDAsqbvMjOaHD/Cdzl26rgv+8QLVNDUvGfGtD -58bWugHyxCdnDToCtIEaJaoi7izKd0bILbuQXS7oKfryJpHwO+9U8ZjT +MIIDhDCCAmygAwIBAgIUMNeYbv9MMCXx9e/o+BO7JYbdHJowDQYJKoZIhvcNAQEL +BQAwSzE6MDgGA1UEAwwxVExTR2VuU2VsZlNpZ25lZFJvb3RDQSAyMDI1LTAyLTI3 +VDE1OjQ0OjU4Ljg4MDUzMDENMAsGA1UEBwwEJCQkJDAeFw0yNTAyMjcxNDQ0NTha +Fw0zNTAyMjUxNDQ0NThaMEsxOjA4BgNVBAMMMVRMU0dlblNlbGZTaWduZWRSb290 +Q0EgMjAyNS0wMi0yN1QxNTo0NDo1OC44ODA1MzAxDTALBgNVBAcMBCQkJCQwggEi +MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDqAwl11OZzzMDh1oaaA/IbnU39 +VCDE3BsKKg3arhVGhYaSbYEtaJWhNbB12qkw2GEFeSl0mZCSorTJmQHmcUjcO0yH +zRIM5vzEscPOffUBfIxXiVehPyyNJa9P2IRE65i3d7mcmR62dG6EWtj1tW0VLKGc +d3STpmGoA9b8tuJZq9vt6ivDTv7OECCLmDR2IHKoAXKZQmsDgI1Dy0UuLCEWIzOq +r8WAq1at28AAiDL9Rh0bxyQ8oREx86zjLOXOsJ8CNNWFRheAFh65hWWMpM4SavEV +pW0kE9qCfopBGl4xpbBLE/gzkhMkUZVwri6cGakzvA+dmdCsS5D3t2ZHWIkzAgMB +AAGjYDBeMA8GA1UdEwEB/wQFMAMBAf8wCwYDVR0PBAQDAgEGMB0GA1UdDgQWBBQy +XdLfduEv+NHMZvy4ASFH6BIZWjAfBgNVHSMEGDAWgBQyXdLfduEv+NHMZvy4ASFH +6BIZWjANBgkqhkiG9w0BAQsFAAOCAQEAF0gI9aTScyOoImqvHQZXdfZlgphT8E/o +ks2DDY4ZC1KAIYxRj2y+M9zmrQqSbfhSSuEZ8IKaFKMiBPALBlEVrVJUGAoUAjrU +C9zxSg2TOjJqO2lJD3mMJ0u36cmv0sIPhlm0DRnxWg+1eKmAfEn/DPSj6V8xwHqH +7tpPEea19RgKwBCSOnVUmOwDnIEzCy9H/A8U4P2XzFFEIWSeGWlDHFRy8j4P5YyH +0TRpwR4JGh5t/5+bo4hfdHxSeY5wsWk2k+lfNszfau8qEDdFQVASXmJ6iTelSbqv +pSkMsWk9u8z7ENA+w3Qzhwg1OzOluDl8EVAJziSDfGZpWqeVWK1E4A== -----END CERTIFICATE----- diff --git a/.ci/certs/ca_key.pem b/.ci/certs/ca_key.pem index 75349f7..ffd51ee 100644 --- a/.ci/certs/ca_key.pem +++ b/.ci/certs/ca_key.pem @@ -1,30 +1,28 @@ ------BEGIN ENCRYPTED PRIVATE KEY----- -MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIm6kLjkvzznECAggA -MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECGjVddOBQ1QOBIIEyOAafHUtExxT -tY2ONQkUkXZG4/fIDvuNwt8IIkNUIGVp9WEDd4Mh5Ofa52uUKmlhj+FyRZ6u2mGT -VHU65e4kBYB10n0oybRPvRU1tFxgr8qI0T7Fqnx7WJAP3m0Bo/tWfqE0GHRrspZV -gABLVTOFvHE8oOsEh/ndMe+Y2qGaLsl+MF3jkfYAxSK2QwEK9HDa16Xsit7hqVbz -JUyvBmQVfTZzanIall+EpUntv/vlILKIlAFOZUXIZ/iL8LTQCmpycfGLknr4/9KP -gCYZmWFS18X9KVAwgV2kSdUebWH9phDosSw6fZh843l1SQvjG65PgrnWYb6Fw7B4 -s3Nk6bXjHYtvLT19EUrQOdeOegynaQQBs5WIcp9LbKT3LJVQpaVGV9thi+LPz1Bu -Lep583ayXTecA7Dbfa6S9R97TgRoMdDWaz1kTBReQTUhrL5736A38gpwJeBZDqel -39sRULCKARz2ZX0YpeZCmfVhVVSguO5gCfACsqHoOiTxYOA97GR128BcpEVJ1lst -sZZNwT3m6xIcXbS37EImhUMGiQ5fyGZ+8FIozTL9xNopIR97b3ceA9CoLc7EVcFC -RxHvh1HwtpyBDyopJp2wYu31nqcSDsJh+lmjo5R7bqvDDmflfkfu1G45JkXKr3Vz -M89S/y6Uo8W/EYT2MPYTsqcobtjx6oM1RYkVuYTR6cyUgQkHGtptkzGKxYE8dYwQ -4EIm87czYvCW0Mrp6yy0NGKzqBb+19Kuqc0HO+YezEQ8RjOVb8+D+cuCp2ZSItJ/ -S9m7BDTOzTS2lBotrFVkSbzaQafAmxQiaSP7gd0M9dnC0AOB2ILbyRAyIDQ0Y2dm -kMbiewQwNFiY9moRtgzHuHRfFZu4w996Q20cYZyMbxDfY17QoZQzfKWQH1BD7nq5 -G4RFpInt6q4q0F94nQWCif195VZF64+8ETMteJqtBFhUSQbq7PzKdpuf8NFxczLt -MDEWg2l6qNLP+zswulcVbFcC/HxAu4UtYf2m5MAtaurXZZ/+xPW5c/0caWMycQ7g -fbkYvC4j0OT7aqqMd1SYzEx7l75Vqn1sr2BsXZFoaqK2c/1LIb6U1kAhyhDQ46rV -0v6q4GUk4fdnE4N+9MXWBvlKSnqEVYlE54IuSUrYRuuBhO4LQpPMOAafQPR6QCTI -ikqWVmLAj50n7uba0Ao9lRKR7bFpdOQob/nYMTKT6YQaohYhbCv4zIK9fDgWWiXE -a2ecIP3KiZzw7oLMKXLcDt1RkkzE1FQxLfOeZ5EP4RwBGPDvR88ELO+lGQQt3VnS -FIZoXBUFUf7bEUzTwM4240zkjDYQPxD1j769Zq/JZfKyOEXXOJT8xHiwMg3ARWuE -hGlNKApbJGMn9myC61KaGMyCKRvMVxI25w3LfI4OAWt+67BB5OuAG11nmn9Kja73 -bhMFDIMZ8kE0p9IWfpiUJlDB9odGEc4z3Jl5CqBVDkMCDxq9BQDM0hSDk+ov8FO9 -g03PqMxvsxd2c56vkMtNY4hSGkYfN0RsM3vTXXLtPwRwRZURCmKK76BmsT4oBd+W -orqH4SABIAbYTwNOb7k/wOc4EfucawBqMG4g+29qewD67+EXjB0GadqOXRoQyhRq -hd74uUK5gzJOqStqiowQ0A== ------END ENCRYPTED PRIVATE KEY----- +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDqAwl11OZzzMDh +1oaaA/IbnU39VCDE3BsKKg3arhVGhYaSbYEtaJWhNbB12qkw2GEFeSl0mZCSorTJ +mQHmcUjcO0yHzRIM5vzEscPOffUBfIxXiVehPyyNJa9P2IRE65i3d7mcmR62dG6E +Wtj1tW0VLKGcd3STpmGoA9b8tuJZq9vt6ivDTv7OECCLmDR2IHKoAXKZQmsDgI1D +y0UuLCEWIzOqr8WAq1at28AAiDL9Rh0bxyQ8oREx86zjLOXOsJ8CNNWFRheAFh65 +hWWMpM4SavEVpW0kE9qCfopBGl4xpbBLE/gzkhMkUZVwri6cGakzvA+dmdCsS5D3 +t2ZHWIkzAgMBAAECggEAZMk+D9PMFV/ASwQcIMVGRwJvDoZnPqIVu0D1ipOjciYc +GYC0PBxpJW98OqYcbH8k+jh+1Es3axBMkO8nVFrCKKgZg/ucpJXvk7+EN7EkDqnX +v/PVHAubYocyhE8aWJynv4z/EiUYhziKSNLf0qN7Ab2hNUR1nwnv0W8l7t3NixSY +4sIuGhm2QbqfHKvvG//GWOmvRIYLdJPZ69tJR8sOidIpNY2LI4tNXlC2fdPI+RaT +pOJcULSi+AZItyxHwELDR3u5xuWJ3KMcrBRiMees//dhg8Sga0tmBIW1vN33eKtW +wOkq48hBGUi8sfrRfVSiJBquZFURYrzC1J2EZQOcwQKBgQD3DuZl4NQiGaL3afY6 +fp2hstVjRm8Xdy2AdCPXx5w+R8CJUpLDpHfYavT7pFUx7W1ERfbsqHujMbb2Zaq9 +FyYdXvIpcqFjCJl16dLaDmzvDn98v9mWEB9I+NeqXSBVbbPhCFSuKElpx5OrrxeQ +CUMfofoTtQlHvSYUsz0vm/aiMQKBgQDye0HNNd41NiaXVYUQenNbwOBO+POUBxj2 +pccNTpQulZEXug8IomfDLQ+cA3Dsqlf16tran4wVPUO53n2By07nof/tHV6y0IF9 +oQPznCrbaKl35e4MlvDf0+FfGKNFWExwVeKJJGDVUWgBa+cXFvtUJHYDGb7Bo6+C +5NyqHw6EowKBgCyS056t4ZgFaBGbXIFRNr9ltHokywZAykTSr2TO7rGN4H7mFvSV +R8oUAf8ktvo7C+u1c8de3m+jGI976EIVWxsRdj9kHxnvA0Dy3sfYsm6u/vFS677X +Sc2wl7h09NB06m8/QYfqXNRo3YusG2QxR5r9blD/6Jy405YIgJGGYgkBAoGAGaAp +DhTZTOpSHcAt9dXbByFVE0OACm7NlpNie+eIBXxM/yLsn876BEho0+YRMxG1hgmx +41TlKwF0fNokjWj9B8G5GEf4UBF0/d/cWQxyAwoGjuM/yxjQj/cGZFRoPNXeDikl +bbTofuLBiRTsMSZ+nR/VUPKRlElGLSEeqOPrVt0CgYEAugfHj4AqmDfL1F3rbw+D +XwiyvRqwLBYzxZ0t8Hbm0uht0MKjTgJ/G2fkC7Y8aJgEn8jstfoVSh9sXUKgUnCA +MAuil2220ctEh04bp/na1z/9igJWGiJNbRVXjF+BRAbSJ25RY8BX5dDydcWbqzyr +eT5xfQPC5smWZlI0l0JK1bs= +-----END PRIVATE KEY----- diff --git a/.ci/certs/client_localhost.p12 b/.ci/certs/client_localhost.p12 index 1d688c5..4d84a9b 100644 Binary files a/.ci/certs/client_localhost.p12 and b/.ci/certs/client_localhost.p12 differ diff --git a/.ci/certs/client_localhost_certificate.pem b/.ci/certs/client_localhost_certificate.pem index 1af5a9f..b049ba1 100644 --- a/.ci/certs/client_localhost_certificate.pem +++ b/.ci/certs/client_localhost_certificate.pem @@ -1,22 +1,22 @@ -----BEGIN CERTIFICATE----- -MIIDvDCCAqSgAwIBAgIBAjANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH -ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTEwLTA4VDA4OjE2OjAzLjU5MDQ1NDEN -MAsGA1UEBwwEJCQkJDAeFw0yMzEwMDgxNTE2MDNaFw0zMzEwMDUxNTE2MDNaMCUx -EjAQBgNVBAMMCWxvY2FsaG9zdDEPMA0GA1UECgwGY2xpZW50MIIBIjANBgkqhkiG -9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxoOGcKsURRZG0D89J8rGcolZVqX56rDgA0Ma -cn4AosMQTZ86XAq+Ygn6QVcFV3NjuHxb29vsZfjSYbBpgQNLfpXN9EfeswVvaJND -wblKdRo10RTPslFewI4Aac88GXva+3DBMCwv3viI2S69apcuZgGw0+EKDh+JmbcM -sdH81hZhYjmrS529qSOIji8vJYFTCQPMbGN17elnA7pZaHEmPKj5mzm0veSBvCwU -OZORr4eFE7Nct5RmhLm8DWT0EBRUWT8D6/b6+0ln32Yv30YNpKrua5wkn+kxsvKJ -tQRRKYRyfegSj6mo6L4za1ZvwV/JMN5mDLQUajvtOCsD4NpKcQIDAQABo4HPMIHM -MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMBMGA1UdJQQMMAoGCCsGAQUFBwMCMCoG -A1UdEQQjMCGCCWxvY2FsaG9zdIIJUFJPS09GSUVWgglsb2NhbGhvc3QwMQYDVR0f +MIIDvDCCAqSgAwIBAgIBAjANBgkqhkiG9w0BAQsFADBLMTowOAYDVQQDDDFUTFNH +ZW5TZWxmU2lnbmVkUm9vdENBIDIwMjUtMDItMjdUMTU6NDQ6NTguODgwNTMwMQ0w +CwYDVQQHDAQkJCQkMB4XDTI1MDIyNzE0NDQ1OVoXDTM1MDIyNTE0NDQ1OVowJTES +MBAGA1UEAwwJbG9jYWxob3N0MQ8wDQYDVQQKDAZjbGllbnQwggEiMA0GCSqGSIb3 +DQEBAQUAA4IBDwAwggEKAoIBAQC7L/xjD4iHTCf2IfXd/fayxkX0+dI+Z2y+latM +UFvn4GpDIz0Acfqjp3/NhShbWoHqOhR/w5l20J9Ljt2RmecpybK717Flst8Q0g0C +xm3GaN7fVLAxoWAIbzU7cAZMv0SRuu2RIo2HTt5i2xBljA5Bf6wMZqMFxvnNWNGt +TIWVUzCjeqWqPUi84XdHu0GWyQ11rIjCnw5zY3D8EFc+HoTgI33y81EABps7ybmH +BdUtMsAFEXgk3lJplaLeIvlM/HzBk+ffkqpcwC6kTnoR7Nww8a2aE6wHq91Hj+R7 +mmAo8Hpx0grott/pmwWOd2Ld1w3gxC3I7D6yqjfT4Rjc6FyxAgMBAAGjgdAwgc0w +CQYDVR0TBAIwADALBgNVHQ8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwIwKwYD +VR0RBCQwIoIJbG9jYWxob3N0ggpGMjNOMDQ5MlhUgglsb2NhbGhvc3QwMQYDVR0f BCowKDAmoCSgIoYgaHR0cDovL2NybC1zZXJ2ZXI6ODAwMC9iYXNpYy5jcmwwHQYD -VR0OBBYEFLPquWS+kT4+JE+cssrriRkL9UADMB8GA1UdIwQYMBaAFNSsn21DVr1X -hhqmU+wMnLWFZc55MA0GCSqGSIb3DQEBCwUAA4IBAQC1Pz8SahCsQyiyuu6dz391 -KENabMpCwb/2wxljN5lfkOvvUrVmupld8/5nIdN2drL9jCrfbBz5ZRz+9Ryb8yrc -sioH8Y9RNU5Gc3UJo7aAoMx4sIib6uJ+UO4fVlVvD4cN2h2sLHxtkI173Oo7lnMf -4c+75iyZYdkEDXaOk+UbR8dncCj84y1Sbt0FYfCMT688O4HYkIGA3xGmqyX7PYV/ -CP8CNKwJEuZpQRaGdClkmAmoEPyuFW9ec+A9gOrgCpuFJBI4MRcicC5Q+qmx+LTM -pZ2louMnnlTRoj3tL4aDgfdwV0YGxyIjIzuYLy6QCF8MZ/TLwPK0C3oXXuYmCLBO +VR0OBBYEFLmThoy0pKufr0QWZRwg1FJGdcFRMB8GA1UdIwQYMBaAFDJd0t924S/4 +0cxm/LgBIUfoEhlaMA0GCSqGSIb3DQEBCwUAA4IBAQCk4Ytqqtymc8h0M2HiIyhK +p2Dkf7GZRjBPvC6ULIxMEixslcDCkVTkLaYKRJL7xv37RNfc6kgi9K1IjPfDUtEm +IDm56hRhIvLkH/BsUbhhJsZnYBN1GbqmFNtNP7Zj2Yt6uAwFkFB6gnK7RflSwVaG +EYZhs8QEmZ1VhGymJorp5HGI6EcVkOhG3pScp5yaAqM2cKy7CLnZJfpCzQ12LZ7/ +2UEKRtfILvN8kWaWOaGCM7t3Z2i6bfEh/1WZBmZnyK+zDBxv/YDp2iave/i7r/dY +tOZA1KB2OMWZY4pHmiEior05yf0o7xNctPdwy3+IvRYAH6FJhMA29XoizPW8Cvtk -----END CERTIFICATE----- diff --git a/.ci/certs/client_localhost_key.pem b/.ci/certs/client_localhost_key.pem index 241f86f..9f6dcf4 100644 --- a/.ci/certs/client_localhost_key.pem +++ b/.ci/certs/client_localhost_key.pem @@ -1,30 +1,28 @@ ------BEGIN ENCRYPTED PRIVATE KEY----- -MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQIqKZZASlLYRICAggA -MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBCQpWBZXmYQn0c6PZ4CnLrQBIIE -0HwXxx0lDzPbw53k/ak73G4CwBilSpaIM5x7jNwwD7UhiR4Qo9JiYLRy2zn0RQZJ -wK/Hhta3SKecTHqgMwPHk8s4Bu6EhSIm3/x2OhAtk2lLeubZkjgEKCfQbu4tVpeH -jOw66Pxz52fhdJ7GzaTnWjjTYmEPxNpkRiUAe0v+lOD09OQvQIFVEDyqSATzRUjd -GTvQs8H5N/XJR7xTuPRQekauY5gIcneE4oynGF5a9L870XfLh/H62f+pD19rvESh -qqdCxklxwAfHGHni2p1UKgNPxJHzSMH9dGCAGT1fxLg0RtXfBMdl3gzPnwbZ1PmB -tjVxCqtw7XAirdlBX79+dhZ58HCN+j7pkL9LWwRap1klN7Y+Iwf0XhK6imSY7Ex4 -4odxin7kF1yW65PTYKyS7cRuFip+k2YShXApN5PrF5SqNEFVt0A9RG7h+GF7EXSD -QS0ecqwhnzuHGHSpBjvsEw9z3FWBL1tFC1i2cF7m3yTHDLVQoevkUY43Fmh8S/CZ -gthQ9P58A3dIDSJM0vcGhHqJBLbxOF7rSqwIuihZJhBfqclw1V4fKk9VuRzp4MHf -NrZEuCr8CTrcYnl2n6Z/MaJ33XRg8uwwy+O5RGF1I1GAmH2KdKORUtrYHlOdTd4K -2NXEgy2mgDQYPbl/1tk8bH6hroIY9Qofpzi7MTZ++32AY3ggf4GnqAk4eAP5R3Ey -PUYFtWaGftaOQCR5Ovocdn41YitUJxAPh6hE5HqVicO2rEfx13uzug9usdg6256i -GgKSTg4jqBiEw0oJhb9TVYNY44koh9yMRM/sfidqarNKWU7bWDVKhl3hGaNhj+oX -v6ZC8rH6m/zHRtbn7tAw/q+EtTHmLo2AaUf13V4Ii6VrEXMRSlv/AyipYmOIwgV2 -EZriwyhsT2RaVesAgKExHbnP6dzX2P2IGTMNISZDNlATMT01BfWG/loPe+6DbxzW -aHv6Y0FknGeHGLDwiZMv/hyn8a4KOvIl35YZBJqZ8UxTirs9mLRd4Us5CdXAHQlL -5skAzf5FSrVbQvUbvKIrO+ULGB5mDATHR/tgOWVaP656tiRMrtFW8XGNxaPjyDPt -xhA3fVOc68f1UzTqoGpsZtUUMQxkndW3Tg50V4ssw4F9D4Grce9XXgfBdEFz/Gfc -gSR4SYKelS5udrMvqKxUs+zobx8TH2CqzwDDcC0kxqC9VCMnHqaD3wSMbN/RBoYT -lkD4DRmDFTwlsQd80i2j6K0eDFo7uvROWM72gAOb/wmssZBaSF3g0E5CrNSxApkz -+VhgqfGBYDrFZijMHiCw+XB8kFFBrlXlcBOUHu1trIi7nwcmN1JvnXL0dVOgfSGE -VNAmVz/sHdeAJacf05tehkAFTubdiZ24M/mM+VbKiJ2dajYRSoePPc8P65urlv6E -rszIC9NhfwBy0TDgXNC/GV3y8mC7rp6kzbyzEb2H2M5ltyEKOIyjvRvtks2/opSX -5T42x6xJtS6qTRttwrpRE5KjBHgcq0m8LSQu6chKwWinFUfOAdcZODvVVqLA2e6K -plfcGb027WE4DHqTzW73nbnK+NwkP3lLORbro7KWDFdNKP3v/Rx0Uq7CPGVN994G -tH7wyGheZNTMtKDFGrAdOqse9/sKcTF3thaqYqXgDDZY ------END ENCRYPTED PRIVATE KEY----- +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC7L/xjD4iHTCf2 +IfXd/fayxkX0+dI+Z2y+latMUFvn4GpDIz0Acfqjp3/NhShbWoHqOhR/w5l20J9L +jt2RmecpybK717Flst8Q0g0Cxm3GaN7fVLAxoWAIbzU7cAZMv0SRuu2RIo2HTt5i +2xBljA5Bf6wMZqMFxvnNWNGtTIWVUzCjeqWqPUi84XdHu0GWyQ11rIjCnw5zY3D8 +EFc+HoTgI33y81EABps7ybmHBdUtMsAFEXgk3lJplaLeIvlM/HzBk+ffkqpcwC6k +TnoR7Nww8a2aE6wHq91Hj+R7mmAo8Hpx0grott/pmwWOd2Ld1w3gxC3I7D6yqjfT +4Rjc6FyxAgMBAAECggEAAvFuHrGbFC4rRQMDg7NvRPV57/Awm84SUeHLtnC0rTiO ++ydrRAicEqV6zISu3dbWYD7RsoY6XA72KODiFMdjxiQCH9LJmtTSjd1mRSL7uAj5 +6MQtsVi9SDdVZy3rpMUaGHolAOlB6pIfzClFFfpQgWZMPSACVAAs+SClH/wqGoRU +LBt8uAwwx9QWqId8nuP4OorAFQYqmmzFb6Q42CanSNObaWcutmIuziOv/P9hRKcx +WbUei3Q5+DjU4ScFGXmyKzP4DxYMJM42jqocZdggHk+eL2yjdR5TmUpnR5WDcjtM +pf5lXzbPsCaFpVsQgHTZgA5OWcaztEtObnINrryb5QKBgQDtOpp5K0kO6HafC9CI +sPDmYyEtfg+IIv9rdEzHmpuj3uyeFBxO3fNE51m1pxGr2DmfekeWzG17lIFIrOkf +t9SAMVPiI5m5mKsPHt3bRE3CjJNjHTvj3tQyF7xDEsJdYyTvEGB5tTKl+t338Wq8 +1H2B0szEr8dEbt1Hf2WFz/lA1QKBgQDJ/7lQnXPC50+U0IhdFk9K/YPE1FOo3ck8 +EIi0S4A0F97N5bCoQ1n+PSCLMGDp3f/QQfX6dh/dnX+SXXlOVNbquKuy5/uIj1Lq +glA0Jj9wKFDVpZG6wziB79TQWQP6TltsQJ4NGwpuPUbyxUQBBg2t3cZy9BJpTJjQ +LUYbSnm6bQKBgQDBi6eOJjeT9ysYdc4sR5gzjzr5X7kSS+Nx6s/dphFHcFBCZIv3 ++HNKiyoQ3362YlIY/+26ZY0JX07fWVtVqmiwMg6LGJqJ5rnhO0CsbRy4FnMFUUuU +jS84s07AtmRnRsVSWl0rzx7Edll0ub1o1ECVk8PG0NbVyVG1zIWq19Q3BQKBgEOa +8LzIVawPmpTlzh3Jj7Q7cNR5c556zBTsO7SL6FaG/qzOiPdnw0DR2Ih9IpJjGHDt +ApRW4IddZQrpeeX7gwp/0AdKmOa1gTy3bHxnqKey9orqpQFqwQjL6d/pSumFPBfY +8IzWVgFbRNmPqBjnm8BrDzX99gOD/Uj/Pg14OZFpAoGAfDfDCsKbW6TJnvbIFkw3 +/V4v6WAwaUS9mECtRb00yzNtn7YbEveQ2/pKPLPZ5z8Vz9pMpuzXfoqFYUEJNsz/ +F2qNaYrvREsDbsLVqFdTTyNHPicilcM8bfmaspI72Tb/YkJNH0/cVhF6H8/J02rr +ValHWT50FbbgAY337QwDjO8= +-----END PRIVATE KEY----- diff --git a/.ci/certs/server_localhost.p12 b/.ci/certs/server_localhost.p12 index d77b7dc..437d112 100644 Binary files a/.ci/certs/server_localhost.p12 and b/.ci/certs/server_localhost.p12 differ diff --git a/.ci/certs/server_localhost_certificate.pem b/.ci/certs/server_localhost_certificate.pem index a67718d..0e72b16 100644 --- a/.ci/certs/server_localhost_certificate.pem +++ b/.ci/certs/server_localhost_certificate.pem @@ -1,22 +1,22 @@ -----BEGIN CERTIFICATE----- -MIIDvDCCAqSgAwIBAgIBATANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH -ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTEwLTA4VDA4OjE2OjAzLjU5MDQ1NDEN -MAsGA1UEBwwEJCQkJDAeFw0yMzEwMDgxNTE2MDNaFw0zMzEwMDUxNTE2MDNaMCUx -EjAQBgNVBAMMCWxvY2FsaG9zdDEPMA0GA1UECgwGc2VydmVyMIIBIjANBgkqhkiG -9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2dxp0wR++oE89W/mhEL7/XfJfo8iDbKKciUP -PyIgBvggv625HifmEJG+epl77KinbCuZdc0DX/2FKH6HPM/tC6VcWB2cZRSHpBSM -aieRV4yiaUFTqlOgQalJyRczRtv35QPdaIcDOX4lOw887sn6sJuZY5FtAyDr3opA -gZWLR+6fqi0YWqp5wqaz3hMzTGEEuu/ZKSqMWURRvp+Voz13auiShvhRb9hsdRp0 -zf12Y9wGhWjOg7G6v1r/BP6/Nr1gWrgNUhuomSFC1FCRdCr1VrLpUfG3VNloVEOG -mbWYfo+cDN6fV+PDlVB5UQp9YciFfpGXBzSXgNcsk8fEXpg8IQIDAQABo4HPMIHM -MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMBMGA1UdJQQMMAoGCCsGAQUFBwMBMCoG -A1UdEQQjMCGCCWxvY2FsaG9zdIIJUFJPS09GSUVWgglsb2NhbGhvc3QwHQYDVR0O -BBYEFPezEEGf7j3HedbaRCh4/FHT2VXrMB8GA1UdIwQYMBaAFNSsn21DVr1Xhhqm -U+wMnLWFZc55MDEGA1UdHwQqMCgwJqAkoCKGIGh0dHA6Ly9jcmwtc2VydmVyOjgw -MDAvYmFzaWMuY3JsMA0GCSqGSIb3DQEBCwUAA4IBAQBLeagmroj4FFOXgUqDQo7i -kGCBZuCmn6GnCYdwEHtMoysGZ3vNFsB1BCug4fTuL7OU1l+Xw8iVnIvnGBpKypmt -b7h9dN6urty0ewCS4WO8BTZUIdc1RJMo9N+nEMTja+5cqXHtO/VQnO2eqeALWJUU -IDPycb6HcTkHGFX0QDwxsPuMFL3p5HGr6U0llLF0J5FedxUA/YLLVCStofrWvBGT -PKngh7S6ntaIUnTvwyzY2kPJ+byqRDNrL5jdavw1U8cGh1vi3k9mf1Uloi0mnAMT -kqOPzbQmHIQjxIOwqp2xkObXgqz1b0KNDfRDTwp90wzVxOCF5JJBCAIjPyLuncDv +MIIDvDCCAqSgAwIBAgIBATANBgkqhkiG9w0BAQsFADBLMTowOAYDVQQDDDFUTFNH +ZW5TZWxmU2lnbmVkUm9vdENBIDIwMjUtMDItMjdUMTU6NDQ6NTguODgwNTMwMQ0w +CwYDVQQHDAQkJCQkMB4XDTI1MDIyNzE0NDQ1OVoXDTM1MDIyNTE0NDQ1OVowJTES +MBAGA1UEAwwJbG9jYWxob3N0MQ8wDQYDVQQKDAZzZXJ2ZXIwggEiMA0GCSqGSIb3 +DQEBAQUAA4IBDwAwggEKAoIBAQCn1MRZTV3ATEvS8jFXhci/HGup4acSa1AduNak +8fpGHSFFmrywY6cl00rmPa95nfGloqbkRydqOwMn1Pv3XfHc3UeaiBgU+FNRj9u6 +NOwJ0zR3QkqLxvQqbjrvxMN/IaZ2WL0Zem+j8YIY9yHytjkLEX2AH9AZLwHpdBLI +vSVeS3BNF/gKpXYExGNNfG47/Lo0fIgwboN069pHY/Ff80SAzUkzRcOxDplJoMWp +wym15ssmAnGzAzTrMhKIJ7rUyaE0ZNAIcid7KQ1VzB+yMpeYz5pdbx0G4U/DuVXf +j8FnwlGwGAw05CckDjZcgrWNgLz1kqEcMV/UEFlbQuEzl5kTAgMBAAGjgdAwgc0w +CQYDVR0TBAIwADALBgNVHQ8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwKwYD +VR0RBCQwIoIJbG9jYWxob3N0ggpGMjNOMDQ5MlhUgglsb2NhbGhvc3QwHQYDVR0O +BBYEFGv69aUODEtJA5QWU4KalMtGvuGYMB8GA1UdIwQYMBaAFDJd0t924S/40cxm +/LgBIUfoEhlaMDEGA1UdHwQqMCgwJqAkoCKGIGh0dHA6Ly9jcmwtc2VydmVyOjgw +MDAvYmFzaWMuY3JsMA0GCSqGSIb3DQEBCwUAA4IBAQBQxX+IwLmt9emhC/of3riN +wQaLXGYKKMHcsimGkBsQbitWlwWtBZwR2F9aOlvcOAlFbQ2Enldbdpkens1YwR4k +Fsx2VdOnumSYbq6DKZg0mMrg3AqufYLBGVPSGNksQ6qERZVD5NGATLh0kA9R3q0h +eGKJbHyrdI6fkSELkmBGbuetjmGIfmYh+OjYZhqvU5mutjdOfY9k1t08eRvdNiIB +4HxFVEk/S0opA98LkjY0wjPSAMZAWPNxHD5vHoaI6VwYnxLadD1NcasfEpae6uLW +t7CT+v6rtfBXvczfdd9rmhCmcHR5ckrL/wbpnvgkloQqxclw5IpDt/JkPyGghWx3 -----END CERTIFICATE----- diff --git a/.ci/certs/server_localhost_key.pem b/.ci/certs/server_localhost_key.pem index a49d186..93189d7 100644 --- a/.ci/certs/server_localhost_key.pem +++ b/.ci/certs/server_localhost_key.pem @@ -1,30 +1,28 @@ ------BEGIN ENCRYPTED PRIVATE KEY----- -MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQILovSnFfKBhECAggA -MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBBmLCdyyKqcbbjoi1/8A+rxBIIE -0Mmi72DP32seewlELsG4gVkOH6Gwvs5iAqHYap1yOps3mfI1TtuMhDEZDH2Sj+MB -J1E35WEzJGGxTVhvK/J+R/1fUfd44Acgl1Ks1IINJyre4+vYfDUyWB5O2lS+9mr7 -L6q7kfAbBB2OuAEuGL5GMlTRetyASXbspWbi0M+vA9R+NemYbRzFpozP/fedFpQY -6r/QnogSwuRcE1VMghUjZwzZWyG2HFMFp5emiAHRVi+SxLpIIv6wwV8SB4jDMO46 -CsyxLjkjhd2GmkMRpmIxXw7eXbWa/bnf/KhJG7gSDBgmGuoBJ4cDnQc2jFN8UqXW -IG3+K6PIeGTT/t4aC6YSq+kb8R3rTfVbPdq51Uo55uMatpJg8AatsysL900nNfuz -MejikInTz4+m6jY5kzEm+fToRNHXhcmnQeD6SYc8PNi/4QfxiMcHcI91GRNQ2nFI -Xd5a1CG4f78WGUmK9PylxBdh+1nx9yQyrZKWcShuLkOQk4UAL0w31B70/l9jVoiN -gcN4w18TUfYLIg8Ab6lL6wXipBrr1AjB/Dn2oCpMTiMolyWcsPAHDtxvrsgbsXRr -vxd/vNo+RpSsvjq2wnXhxe+qC/uHBzJeyfx0m+rs6vBKPZvS7uTBfYGG+RhVJvb5 -W2RRfprvTzgBbbKBCTJ5ry4SMZX7ci008f7oVqKLAlsApA58dDgZ+ORF4TxtdSkJ -u3r2htUBvC+mzYMYU4D+sYQ7S9qqVhKe7hvNzLW5UhkEhH57SQ1dIcstTsTYUDC7 -1o/zOkpVxByudKEGwgEtyYM+DD/YoGLGB/4qPULnHFOBwxWdK6Ov9I0ezuhe/nOA -ERe3ixLklwHRI5sM/gt57A7MiMPhFHDpqt/xO/m/uCX2VRDW/IAKXpIfxuuxDcIz -MLLxJhYCrGRHMStmBAPy3zmmhpn+wHTkwVbEVRMsh+o8M2vPelrysUtUlarRBQI+ -l5tY/UCgX0bGUvHKIp5z8GuRu/CTpjtpsyuNwtpq2TrgnmyiznyfFl4oknvEcfmF -BLUd23ZrTyn1ha8cnKXY9JSHgS2cxdU0QnkPT1BEypptf30nQ1lLqiUg9GLR+xC5 -EeHn/80gL/MrpVnWdEznJdWMzau39kqf3ajNQlUb/SX5YQaeUKYrWoLHI+UNhUG2 -5fr2vcBgk0gt7k5ZDpWejhEu0BDTf3xrE9dU2jj6hOw6E+Q5bI59QvnLYqCvqBmE -asDMBafo+/Px8xnXazFr5b5FyNqeXzBRPgRw5wFmK5YdFXU0fIpuF9IJb1TwLITp -Hk+Hn760AsT3ALzHgRzC2e6bUUO6F/iw/6s6awwRbEPpLYTHwb9Mv7efeVsGTYiM -Fi0OHapnzzbb4ErVL+92mkOT8flDoLhbKHJCRbOvu4C9awRs5aVbkEsygV67tLwu -SIgUMpdxOMYYquyCJ+WUbyv5VSyvhnUIj7u2kdH+zyAendAi4Rgx/5e4PcD62c+X -tNKp4KrlpF3jGIaPODXZVE2aIrhI0njVlUjIQRs6OOMXleO6+xWQI/1fx/xn/oKm -TBUOtW3Y7AzyojbPiScvjmT+aoVwAZ3juHnUuxEuyUcI3WokkWPpllcaGd95sCUG -7iR90VPBJ/meYyQMYY1BGq4ngi5DvLGy6K/pS5CHPi0U ------END ENCRYPTED PRIVATE KEY----- +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCn1MRZTV3ATEvS +8jFXhci/HGup4acSa1AduNak8fpGHSFFmrywY6cl00rmPa95nfGloqbkRydqOwMn +1Pv3XfHc3UeaiBgU+FNRj9u6NOwJ0zR3QkqLxvQqbjrvxMN/IaZ2WL0Zem+j8YIY +9yHytjkLEX2AH9AZLwHpdBLIvSVeS3BNF/gKpXYExGNNfG47/Lo0fIgwboN069pH +Y/Ff80SAzUkzRcOxDplJoMWpwym15ssmAnGzAzTrMhKIJ7rUyaE0ZNAIcid7KQ1V +zB+yMpeYz5pdbx0G4U/DuVXfj8FnwlGwGAw05CckDjZcgrWNgLz1kqEcMV/UEFlb +QuEzl5kTAgMBAAECggEAEaH/jRhdSLZbYwrSF011hWqxfxQ3ru46aR0B5CuOLW6j +D8KNn4Sgy48S9/S0KnVnLY1UtngpUnZnwvgUDu2+WwOeocQ5r35VlqSkI8Cqqe+Y +PA1pcp0RCyIwq/9CwOkiqZ1yJKqh7xoRHplcZjkx7hFE28C75uFy9Hme/ZstwWXF +E+6Puia3YcE1CAYiIzrdKDGL+uIVjMfXQue3JybST9CzSPk2mgTq4tGLDON82V3u +RC80YmhSrzgi9/CPBQwE2YtD3zO0RTqTE1s2efP1ApfWZDL09rBWj6P4lplFnjzk +IAW35SbP8zEtnFuMLEui2cSr0ewAPks5x5HflitxhQKBgQDSP6R2iDa1XsxsMCO+ +hAgvIKelzrI1vdOs5OmpQQonL6t0xfFbesAEKxhoRQe73nvgQechLrbTjcAMDemj +F98TC39f3TGMVi2XQMaAkJMt+3NGtYj7OTrfwIB7sFZXg9guO1EG4hEsQbP0P11S +aFEoRp+/0dRVDc5PvHz70sNz1wKBgQDMWiqENzhuk1Ha2XzRpmgLvDqZ5x1rS+2p +LvwcVAEFuK1EoOqcGy8KBYz2HHQg3dbdDlM/ptSaV1YFqB8DGW+pfAHFqiS47YQf +QHSgoYXfHm9rSDkS2gnPLK0V8rN8Ft5umWx5FkT8x7ormaxUVcSg2Dxhe3J4UCh+ +EwJhoXudJQKBgQCpceVIKkt9LOOvpbSJDLvTz4uNk+IIce6w/uRaJjLalg6m1AjK +40jxkxHepxOuk4ZenH58PbvXD/zhOi075jdAkBmd1xTht2qS5f+VCe+0NV0YdaHq +ZptOTUS/asSLT5Tg3alV1MhmVKWFibPagHw364NAAwoPaksF9DD+e0ROjQKBgH4q +VQGYTkEGt4zUphmSEb7dEZkfdaxfDnZbyc97lb4AjQlICFEk/1/CmYsBejkofZWx +WHh9+djofvWzHKJ/O895/mYZa9641c+trdPWpZ5hXgzwZDxdXZ0JSju4wlOkkuPZ +2XzQ4ProHOr6T8kpwuJDXtQYsU3Sv41HEztPxc/5AoGBAIbELsuD0WsEEmQu10hp +fCzzUan8tS5cFYEBXYI6XoUBTHv/TM5Mx+hpEqZFYOALVmYyLEC6eFKYFFZVDZ6t +Up6L91J0AhsInnON3RotUekx4l77woMufYmOSuARqU/+UjkXUfrCXnjK/064HNEO +rRdolD9MXCQidIrqwsPyHprv +-----END PRIVATE KEY----- diff --git a/.ci/ubuntu/advanced.config b/.ci/ubuntu/advanced.config new file mode 100644 index 0000000..fd7d0fd --- /dev/null +++ b/.ci/ubuntu/advanced.config @@ -0,0 +1,13 @@ +[ + {rabbitmq_auth_backend_oauth2, [{key_config, + [{signing_keys, + #{<<"token-key">> => + {map, + #{<<"alg">> => <<"HS256">>, + <<"k">> => <<"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH">>, + <<"kid">> => <<"token-key">>, + <<"kty">> => <<"oct">>, + <<"use">> => <<"sig">>, + <<"value">> => <<"token-key">>}}}}]}, + {resource_server_id,<<"rabbitmq">>}]} +]. \ No newline at end of file diff --git a/.ci/ubuntu/definitions.json b/.ci/ubuntu/definitions.json new file mode 100644 index 0000000..bfbc99d --- /dev/null +++ b/.ci/ubuntu/definitions.json @@ -0,0 +1 @@ +{"rabbit_version":"4.1.0-beta.4","rabbitmq_version":"4.1.0-beta.4","product_name":"RabbitMQ","product_version":"4.1.0-beta.4","rabbitmq_definition_format":"cluster","original_cluster_name":"rabbit@rabbitmq-amqp-go-client-rabbitmq","explanation":"Definitions of cluster 'rabbit@rabbitmq-amqp-go-client-rabbitmq'","users":[{"name":"guest","password_hash":"5AXVjnnJAKWzGy8L/t9vhOi5iZ4j2wwUA9aI0QoOgBYPXmGS","hashing_algorithm":"rabbit_password_hashing_sha256","tags":["administrator"],"limits":{}},{"name":"user_1","password_hash":"k91LVmfv+JsXCihK+BiwURDo2otPX4wRtX4vErArkhRq/kkJ","hashing_algorithm":"rabbit_password_hashing_sha256","tags":["administrator"],"limits":{}},{"name":"O=client,CN=localhost","password_hash":"n3z/QaCVGTgelie+hmxw7//jYQmtERIVOQj+tw47AoPVAsCh","hashing_algorithm":"rabbit_password_hashing_sha256","tags":["administrator"],"limits":{}}],"vhosts":[{"name":"vhost_user_1","description":"","metadata":{"description":"","tags":[],"default_queue_type":"classic"},"tags":[],"default_queue_type":"classic"},{"name":"/","description":"Default virtual host","metadata":{"description":"Default virtual host","tags":[],"default_queue_type":"classic"},"tags":[],"default_queue_type":"classic"},{"name":"tls","description":"","metadata":{"description":"","tags":[],"default_queue_type":"classic"},"tags":[],"default_queue_type":"classic"}],"permissions":[{"user":"O=client,CN=localhost","vhost":"/","configure":".*","write":".*","read":".*"},{"user":"guest","vhost":"/","configure":".*","write":".*","read":".*"},{"user":"O=client,CN=localhost","vhost":"tls","configure":".*","write":".*","read":".*"},{"user":"guest","vhost":"vhost_user_1","configure":".*","write":".*","read":".*"},{"user":"guest","vhost":"tls","configure":".*","write":".*","read":".*"},{"user":"user_1","vhost":"vhost_user_1","configure":".*","write":".*","read":".*"}],"topic_permissions":[],"parameters":[],"global_parameters":[{"name":"cluster_tags","value":[]},{"name":"internal_cluster_id","value":"rabbitmq-cluster-id-A5bx3jtkxi8ukG64KRkw8g"}],"policies":[],"queues":[],"exchanges":[],"bindings":[]} \ No newline at end of file diff --git a/.ci/ubuntu/enabled_plugins b/.ci/ubuntu/enabled_plugins index 2e81f16..1f453bb 100644 --- a/.ci/ubuntu/enabled_plugins +++ b/.ci/ubuntu/enabled_plugins @@ -1 +1 @@ -[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top]. +[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top,rabbitmq_auth_backend_oauth2]. diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index 646f930..1118c97 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -7,13 +7,7 @@ set -o xtrace script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" - -if [[ $3 == 'arm' ]] -then - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}" -else - readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}" -fi +readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine readonly docker_name_prefix='rabbitmq-amqp-go-client' @@ -91,6 +85,8 @@ function start_rabbitmq --network "$docker_network_name" \ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/enabled_plugins:/etc/rabbitmq/enabled_plugins" \ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/definitions.json:/etc/rabbitmq/definitions.json:ro" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/advanced.config:/etc/rabbitmq/advanced.config:ro" \ --volume "$GITHUB_WORKSPACE/.ci/certs:/etc/rabbitmq/certs:ro" \ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/log:/var/log/rabbitmq" \ "$rabbitmq_image" @@ -163,8 +159,7 @@ function install_ca_certificate openssl s_client -connect localhost:5671 \ -CAfile "$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem" \ -cert "$GITHUB_WORKSPACE/.ci/certs/client_localhost_certificate.pem" \ - -key "$GITHUB_WORKSPACE/.ci/certs/client_localhost_key.pem" \ - -pass pass:grapefruit < /dev/null + -key "$GITHUB_WORKSPACE/.ci/certs/client_localhost_key.pem" } docker network create "$docker_network_name" || echo "[INFO] network '$docker_network_name' is already created" diff --git a/.ci/ubuntu/rabbitmq.conf b/.ci/ubuntu/rabbitmq.conf index 829dcec..7687ffe 100644 --- a/.ci/ubuntu/rabbitmq.conf +++ b/.ci/ubuntu/rabbitmq.conf @@ -17,10 +17,14 @@ ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem ssl_options.certfile = /etc/rabbitmq/certs/server_localhost_certificate.pem ssl_options.keyfile = /etc/rabbitmq/certs/server_localhost_key.pem ssl_options.verify = verify_peer -ssl_options.password = grapefruit ssl_options.depth = 1 ssl_options.fail_if_no_peer_cert = false auth_mechanisms.1 = PLAIN auth_mechanisms.2 = ANONYMOUS auth_mechanisms.3 = EXTERNAL + +auth_backends.1 = internal +auth_backends.2 = rabbit_auth_backend_oauth2 + +load_definitions = /etc/rabbitmq/definitions.json diff --git a/Makefile b/Makefile index ff1cfa7..5a266f3 100644 --- a/Makefile +++ b/Makefile @@ -34,8 +34,8 @@ test: format vet check -rabbitmq-server-start-arm: - ./.ci/ubuntu/gha-setup.sh start pull arm +rabbitmq-server-start: + ./.ci/ubuntu/gha-setup.sh start pull rabbitmq-server-stop: ./.ci/ubuntu/gha-setup.sh stop diff --git a/docs/examples/README.md b/docs/examples/README.md index 022146c..d765d94 100644 --- a/docs/examples/README.md +++ b/docs/examples/README.md @@ -6,4 +6,6 @@ - [Streams](streams) - An example of how to use [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) with AMQP 1.0 - [Stream Filtering](streams_filtering) - An example of how to use streams [Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions) - [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties. -- [Video](video) - From the YouTube tutorial [AMQP 1.0 with Golang](https://youtu.be/iR1JUFh3udI) \ No newline at end of file +- [Video](video) - From the YouTube tutorial [AMQP 1.0 with Golang](https://youtu.be/iR1JUFh3udI) +- [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client. +- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client. \ No newline at end of file diff --git a/docs/examples/advanced_settings/advanced_settings.go b/docs/examples/advanced_settings/advanced_settings.go new file mode 100644 index 0000000..d993d70 --- /dev/null +++ b/docs/examples/advanced_settings/advanced_settings.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "fmt" + "github.com/Azure/go-amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" + "time" +) + +func main() { + + rmq.Info("Golang AMQP 1.0 Advanced connection settings") + + // rmq.NewClusterEnvironment setups the environment. + // define multiple endpoints with different connection settings + // the connection will be created based on the strategy Sequential + env := rmq.NewClusterEnvironmentWithStrategy([]rmq.Endpoint{ + + //this is correct + {Address: "amqp://localhost:5672", Options: &rmq.AmqpConnOptions{ + ContainerID: "My connection one ", + SASLType: amqp.SASLTypeAnonymous(), + RecoveryConfiguration: &rmq.RecoveryConfiguration{ + ActiveRecovery: false, + }, + OAuth2Options: nil, + Id: "my first id", + }}, + // this is correct + {Address: "amqp://localhost:5672", Options: &rmq.AmqpConnOptions{ + ContainerID: "My connection two", + SASLType: amqp.SASLTypePlain("guest", "guest"), + RecoveryConfiguration: &rmq.RecoveryConfiguration{ + ActiveRecovery: true, + BackOffReconnectInterval: 2 * time.Second, + MaxReconnectAttempts: 5, + }, + OAuth2Options: nil, + Id: "my second id", + }}, + + //this end point is incorrect, so won't be used + //so another endpoint will be used + {Address: "amqp://wrong:5672", Options: &rmq.AmqpConnOptions{ + ContainerID: "My connection wrong", + SASLType: amqp.SASLTypePlain("guest", "guest"), + RecoveryConfiguration: &rmq.RecoveryConfiguration{ + ActiveRecovery: true, + BackOffReconnectInterval: 2 * time.Second, + MaxReconnectAttempts: 5, + }, + OAuth2Options: nil, + Id: "my wrong id", + }}, + }, rmq.StrategyRandom) + + for i := 0; i < 5; i++ { + connection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + + rmq.Info("Connection opened", "Container ID", connection.Id()) + time.Sleep(200 * time.Millisecond) + + } + // Here you should see the connection opened for the first two endpoints + // with the containers ID "My connection one" and with the containers ID "My connection two" + // press any key to exit + fmt.Println("Press any key to exit") + var input string + _, _ = fmt.Scanln(&input) + +} diff --git a/docs/examples/getting_started/main.go b/docs/examples/getting_started/main.go index 7a65464..79412f1 100644 --- a/docs/examples/getting_started/main.go +++ b/docs/examples/getting_started/main.go @@ -15,7 +15,7 @@ func main() { rmq.Info("Getting started with AMQP Go AMQP 1.0 Client") - /// Create a channel to receive state change notifications + /// Create a channel to receive connection state change notifications stateChanged := make(chan *rmq.StateChanged, 1) go func(ch chan *rmq.StateChanged) { for statusChanged := range ch { @@ -23,10 +23,16 @@ func main() { } }(stateChanged) - // rmq.NewEnvironment setups the environment. + // rmq.NewClusterEnvironment setups the environment. // The environment is used to create connections // given the same parameters - env := rmq.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) + + // in case you have multiple endpoints you can use the following: + //env := rmq.NewClusterEnvironment([]rmq.Endpoint{ + // {Address: "amqp://server1", Options: &rmq.AmqpConnOptions{}}, + // {Address: "amqp://server2", Options: &rmq.AmqpConnOptions{}}, + //}) // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0) amqpConnection, err := env.NewConnection(context.Background()) @@ -74,7 +80,6 @@ func main() { // Create a consumer to receive messages from the queue // you need to build the address of the queue, but you can use the helper function - consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) if err != nil { rmq.Error("Error creating consumer", err) @@ -89,16 +94,16 @@ func main() { deliveryContext, err := consumer.Receive(ctx) if errors.Is(err, context.Canceled) { // The consumer was closed correctly - rmq.Info("[NewConsumer]", "consumer closed. Context", err) + rmq.Info("[Consumer]", "consumer closed. Context", err) return } if err != nil { // An error occurred receiving the message - rmq.Error("[NewConsumer]", "Error receiving message", err) + rmq.Error("[Consumer]", "Error receiving message", err) return } - rmq.Info("[NewConsumer]", "Received message", + rmq.Info("[Consumer]", "Received message", fmt.Sprintf("%s", deliveryContext.Message().Data)) err = deliveryContext.Accept(context.Background()) @@ -128,14 +133,14 @@ func main() { } switch publishResult.Outcome.(type) { case *rmq.StateAccepted: - rmq.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0]) + rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) case *rmq.StateReleased: - rmq.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0]) + rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) case *rmq.StateRejected: - rmq.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0]) + rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) stateType := publishResult.Outcome.(*rmq.StateRejected) if stateType.Error != nil { - rmq.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error) + rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) } default: // these status are not supported. Leave it for AMQP 1.0 compatibility @@ -153,13 +158,13 @@ func main() { //Close the consumer err = consumer.Close(context.Background()) if err != nil { - rmq.Error("[NewConsumer]", err) + rmq.Error("[Consumer]", err) return } // Close the publisher err = publisher.Close(context.Background()) if err != nil { - rmq.Error("[NewPublisher]", err) + rmq.Error("[Publisher]", err) return } diff --git a/docs/examples/publisher_msg_targets/publisher_msg_targets.go b/docs/examples/publisher_msg_targets/publisher_msg_targets.go index bed4f80..1c32ab0 100644 --- a/docs/examples/publisher_msg_targets/publisher_msg_targets.go +++ b/docs/examples/publisher_msg_targets/publisher_msg_targets.go @@ -17,8 +17,7 @@ func checkError(err error) { func main() { rmq.Info("Define the publisher message targets") - - env := rmq.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) amqpConnection, err := env.NewConnection(context.Background()) checkError(err) queues := []string{"queue1", "queue2", "queue3"} diff --git a/docs/examples/reliable/reliable.go b/docs/examples/reliable/reliable.go index a8c0812..36f3928 100644 --- a/docs/examples/reliable/reliable.go +++ b/docs/examples/reliable/reliable.go @@ -46,7 +46,7 @@ func main() { }(stateChanged) // Open a connection to the AMQP 1.0 server - amqpConnection, err := rmq.Dial(context.Background(), []string{"amqp://"}, &rmq.AmqpConnOptions{ + amqpConnection, err := rmq.Dial(context.Background(), "amqp://", &rmq.AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), ContainerID: "reliable-amqp10-go", RecoveryConfiguration: &rmq.RecoveryConfiguration{ diff --git a/docs/examples/streams/streams.go b/docs/examples/streams/streams.go index f783ea6..3c83d19 100644 --- a/docs/examples/streams/streams.go +++ b/docs/examples/streams/streams.go @@ -19,7 +19,8 @@ func main() { rmq.Info("Golang AMQP 1.0 Streams example") queueStream := "stream-go-queue-" + time.Now().String() - env := rmq.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) + amqpConnection, err := env.NewConnection(context.Background()) checkError(err) management := amqpConnection.Management() diff --git a/docs/examples/streams_filtering/streams_filtering.go b/docs/examples/streams_filtering/streams_filtering.go index ccf053c..0f23064 100644 --- a/docs/examples/streams_filtering/streams_filtering.go +++ b/docs/examples/streams_filtering/streams_filtering.go @@ -20,7 +20,7 @@ func main() { // see also: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions rmq.Info("Golang AMQP 1.0 Streams example with filtering") queueStream := "stream-go-queue-filtering-" + time.Now().String() - env := rmq.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) amqpConnection, err := env.NewConnection(context.Background()) checkError(err) management := amqpConnection.Management() diff --git a/docs/examples/tls/tls.go b/docs/examples/tls/tls.go new file mode 100644 index 0000000..534933d --- /dev/null +++ b/docs/examples/tls/tls.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "github.com/Azure/go-amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" + "os" +) + +func check(err error) { + if err != nil { + panic(err) + } +} + +func main() { + + // to run the example you can use the certificates from the rabbitmq-amqp-go-client + // inside the directory .ci/certs + caCert, err := os.ReadFile("/path/ca_certificate.pem") + check(err) + // Create a CA certificate pool and add the CA certificate to it + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Load client cert + clientCert, err := tls.LoadX509KeyPair("/path//client_localhost_certificate.pem", + "/path//client_localhost_key.pem") + check(err) + + // Create a TLS configuration + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{clientCert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + ServerName: "localhost", // the server name should match the name on the certificate + } + + env := rmq.NewClusterEnvironment([]rmq.Endpoint{ + {Address: "amqps://localhost:5671", Options: &rmq.AmqpConnOptions{ + SASLType: amqp.SASLTypeAnonymous(), + TLSConfig: tlsConfig, + }}, + }) + + connection, err := env.NewConnection(context.Background()) + check(err) + + // Close the connection + err = connection.Close(context.Background()) + check(err) + +} diff --git a/docs/examples/video/getting_started.go b/docs/examples/video/getting_started.go index 5711a70..8794648 100644 --- a/docs/examples/video/getting_started.go +++ b/docs/examples/video/getting_started.go @@ -11,7 +11,7 @@ func main() { queueName := "getting-started-go-queue" routingKey := "routing-key" - env := rmq.NewEnvironment([]string{"amqp://guest:guest@localhost:5672"}, nil) + env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0) amqpConnection, err := env.NewConnection(context.Background()) diff --git a/go.mod b/go.mod index de1dda2..9241c9a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/Azure/go-amqp v1.4.0 + github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.22.1 github.com/onsi/gomega v1.36.2 diff --git a/go.sum b/go.sum index 70986cd..1a3fd2c 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= diff --git a/pkg/rabbitmqamqp/address.go b/pkg/rabbitmqamqp/address.go index 57c6756..85d2d53 100644 --- a/pkg/rabbitmqamqp/address.go +++ b/pkg/rabbitmqamqp/address.go @@ -45,7 +45,7 @@ func (eas *ExchangeAddress) toAddress() (string, error) { } // address Creates the address for the exchange or queue following the RabbitMQ conventions. -// see: https://www.rabbitmq.com/docs/next/amqp#address-v2 +// see: https://www.rabbitmq.com/docs/amqp#address-v2 func address(exchange, key, queue *string, urlParameters *string) (string, error) { if exchange == nil && queue == nil { return "", errors.New("exchange or queue must be set") diff --git a/pkg/rabbitmqamqp/amqp_binding_test.go b/pkg/rabbitmqamqp/amqp_binding_test.go index 40cda11..090ebfd 100644 --- a/pkg/rabbitmqamqp/amqp_binding_test.go +++ b/pkg/rabbitmqamqp/amqp_binding_test.go @@ -10,7 +10,7 @@ var _ = Describe("AMQP Bindings test ", func() { var connection *AmqpConnection var management *AmqpManagement BeforeEach(func() { - conn, err := Dial(context.TODO(), []string{"amqp://"}, nil) + conn, err := Dial(context.TODO(), "amqp://", nil) Expect(err).To(BeNil()) connection = conn management = connection.Management() diff --git a/pkg/rabbitmqamqp/amqp_connection.go b/pkg/rabbitmqamqp/amqp_connection.go index 9d8e5d0..d51b628 100644 --- a/pkg/rabbitmqamqp/amqp_connection.go +++ b/pkg/rabbitmqamqp/amqp_connection.go @@ -12,18 +12,39 @@ import ( "time" ) -//func (c *ConnUrlHelper) UseSsl(value bool) { -// c.UseSsl = value -// if value { -// c.Scheme = "amqps" -// } else { -// c.Scheme = "amqp" -// } -//} +type AmqpAddress struct { + // the address of the AMQP server + // it is in the form of amqp://: + // or amqps://: + // the port is optional + // the default port is 5672 + // the default protocol is amqp + // the default host is localhost + // the default virtual host is "/" + // the default user is guest + // the default password is guest + // the default SASL type is SASLTypeAnonymous + Address string + // Options: Additional options for the connection + Options *AmqpConnOptions +} + +type OAuth2Options struct { + Token string +} + +func (o OAuth2Options) Clone() *OAuth2Options { + cloned := &OAuth2Options{ + Token: o.Token, + } + return cloned + +} type AmqpConnOptions struct { // wrapper for amqp.ConnOptions ContainerID string + // wrapper for amqp.ConnOptions HostName string // wrapper for amqp.ConnOptions @@ -51,8 +72,40 @@ type AmqpConnOptions struct { // when the connection is closed unexpectedly. RecoveryConfiguration *RecoveryConfiguration - // copy the addresses for reconnection - addresses []string + // The OAuth2Options is used to configure the connection with OAuth2 token. + OAuth2Options *OAuth2Options + + // Local connection identifier (not sent to the server) + // if not provided, a random UUID is generated + Id string +} + +func (a *AmqpConnOptions) isOAuth2() bool { + return a.OAuth2Options != nil +} + +func (a *AmqpConnOptions) Clone() *AmqpConnOptions { + + cloned := &AmqpConnOptions{ + ContainerID: a.ContainerID, + IdleTimeout: a.IdleTimeout, + MaxFrameSize: a.MaxFrameSize, + MaxSessions: a.MaxSessions, + Properties: a.Properties, + SASLType: a.SASLType, + TLSConfig: a.TLSConfig, + WriteTimeout: a.WriteTimeout, + Id: a.Id, + } + if a.OAuth2Options != nil { + cloned.OAuth2Options = a.OAuth2Options.Clone() + } + if a.RecoveryConfiguration != nil { + cloned.RecoveryConfiguration = a.RecoveryConfiguration.Clone() + } + + return cloned + } type AmqpConnection struct { @@ -60,10 +113,10 @@ type AmqpConnection struct { featuresAvailable *featuresAvailable azureConnection *amqp.Conn - id string management *AmqpManagement lifeCycle *LifeCycle amqpConnOptions *AmqpConnOptions + address string session *amqp.Session refMap *sync.Map entitiesTracker *entitiesTracker @@ -100,6 +153,12 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti destination := &QueueAddress{ Queue: queueName, } + if options != nil { + err := options.validate(a.featuresAvailable) + if err != nil { + return nil, err + } + } destinationAdd, err := destination.toAddress() if err != nil { @@ -111,16 +170,53 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti // Dial connect to the AMQP 1.0 server using the provided connectionSettings // Returns a pointer to the new AmqpConnection if successful else an error. -// addresses is a list of addresses to connect to. It picks one randomly. -// It is enough that one of the addresses is reachable. -func Dial(ctx context.Context, addresses []string, connOptions *AmqpConnOptions, args ...string) (*AmqpConnection, error) { +func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*AmqpConnection, error) { + connOptions, err := validateOptions(connOptions) + if err != nil { + return nil, err + } + + // create the connection + conn := &AmqpConnection{ + management: newAmqpManagement(), + lifeCycle: NewLifeCycle(), + amqpConnOptions: connOptions, + entitiesTracker: newEntitiesTracker(), + featuresAvailable: newFeaturesAvailable(), + } + + err = conn.open(ctx, address, connOptions) + if err != nil { + return nil, err + } + conn.amqpConnOptions = connOptions + conn.address = address + conn.lifeCycle.SetState(&StateOpen{}) + return conn, nil + +} + +func validateOptions(connOptions *AmqpConnOptions) (*AmqpConnOptions, error) { if connOptions == nil { - connOptions = &AmqpConnOptions{ - // RabbitMQ requires SASL security layer - // to be enabled for AMQP 1.0 connections. - // So this is mandatory and default in case not defined. - SASLType: amqp.SASLTypeAnonymous(), + connOptions = &AmqpConnOptions{} + } + if connOptions.SASLType == nil { + // RabbitMQ requires SASL security layer + // to be enabled for AMQP 1.0 connections. + // So this is mandatory and default in case not defined. + connOptions.SASLType = amqp.SASLTypeAnonymous() + } + + if connOptions.Id == "" { + connOptions.Id = uuid.New().String() + } + + // In case of OAuth2 token, the SASLType should be set to SASLTypePlain + if connOptions.isOAuth2() { + if connOptions.OAuth2Options.Token == "" { + return nil, fmt.Errorf("OAuth2 token is empty") } + connOptions.SASLType = amqp.SASLTypePlain("", connOptions.OAuth2Options.Token) } if connOptions.RecoveryConfiguration == nil { @@ -135,37 +231,28 @@ func Dial(ctx context.Context, addresses []string, connOptions *AmqpConnOptions, return nil, fmt.Errorf("BackOffReconnectInterval should be greater than 1 second") } - // create the connection - - conn := &AmqpConnection{ - management: NewAmqpManagement(), - lifeCycle: NewLifeCycle(), - amqpConnOptions: connOptions, - entitiesTracker: newEntitiesTracker(), - featuresAvailable: newFeaturesAvailable(), - } - tmp := make([]string, len(addresses)) - copy(tmp, addresses) - - err := conn.open(ctx, addresses, connOptions, args...) - if err != nil { - return nil, err - } - conn.amqpConnOptions = connOptions - conn.amqpConnOptions.addresses = addresses - conn.lifeCycle.SetState(&StateOpen{}) - return conn, nil - + return connOptions, nil } // Open opens a connection to the AMQP 1.0 server. // using the provided connectionSettings and the AMQPLite library. // Setups the connection and the management interface. -func (a *AmqpConnection) open(ctx context.Context, addresses []string, connOptions *AmqpConnOptions, args ...string) error { +func (a *AmqpConnection) open(ctx context.Context, address string, connOptions *AmqpConnOptions) error { + + // random pick and extract one address to use for connection + var azureConnection *amqp.Conn + //connOptions.hostName is the way to set the virtual host + // so we need to pre-parse the URI to get the virtual host + // the PARSE is copied from go-amqp091 library + // the URI will be parsed is parsed again in the amqp lite library + uri, err := ParseURI(address) + if err != nil { + return err + } amqpLiteConnOptions := &amqp.ConnOptions{ ContainerID: connOptions.ContainerID, - HostName: connOptions.HostName, + HostName: fmt.Sprintf("vhost:%s", uri.Vhost), IdleTimeout: connOptions.IdleTimeout, MaxFrameSize: connOptions.MaxFrameSize, MaxSessions: connOptions.MaxSessions, @@ -174,90 +261,59 @@ func (a *AmqpConnection) open(ctx context.Context, addresses []string, connOptio TLSConfig: connOptions.TLSConfig, WriteTimeout: connOptions.WriteTimeout, } - tmp := make([]string, len(addresses)) - copy(tmp, addresses) - - // random pick and extract one address to use for connection - var azureConnection *amqp.Conn - for len(tmp) > 0 { - idx := random(len(tmp)) - addr := tmp[idx] - //connOptions.HostName is the way to set the virtual host - // so we need to pre-parse the URI to get the virtual host - // the PARSE is copied from go-amqp091 library - // the URI will be parsed is parsed again in the amqp lite library - uri, err := ParseURI(addr) - if err != nil { - return err - } - connOptions.HostName = fmt.Sprintf("vhost:%s", uri.Vhost) - // remove the index from the tmp list - tmp = append(tmp[:idx], tmp[idx+1:]...) - azureConnection, err = amqp.Dial(ctx, addr, amqpLiteConnOptions) - if err != nil { - Error("Failed to open connection", ExtractWithoutPassword(addr), err) - continue - } - a.properties = azureConnection.Properties() - err = a.featuresAvailable.ParseProperties(a.properties) - if err != nil { - Warn("Validate properties Error.", ExtractWithoutPassword(addr), err) - } - - if !a.featuresAvailable.is4OrMore { - Warn("The server version is less than 4.0.0", ExtractWithoutPassword(addr)) - } - - if !a.featuresAvailable.isRabbitMQ { - Warn("The server is not RabbitMQ", ExtractWithoutPassword(addr)) - } - - Debug("Connected to", ExtractWithoutPassword(addr)) - break + azureConnection, err = amqp.Dial(ctx, address, amqpLiteConnOptions) + if err != nil { + Error("Failed to open connection", ExtractWithoutPassword(address), err, "ID", connOptions.Id) + return fmt.Errorf("failed to open connection: %w", err) } - if azureConnection == nil { - return fmt.Errorf("failed to connect to any of the provided addresses") + a.properties = azureConnection.Properties() + err = a.featuresAvailable.ParseProperties(a.properties) + if err != nil { + Warn("Validate properties Error.", ExtractWithoutPassword(address), err) } - if len(args) > 0 { - a.id = args[0] - } else { - a.id = uuid.New().String() + if !a.featuresAvailable.is4OrMore { + Warn("The server version is less than 4.0.0", ExtractWithoutPassword(address), "ID", connOptions.Id) } + if !a.featuresAvailable.isRabbitMQ { + Warn("The server is not RabbitMQ", ExtractWithoutPassword(address)) + } + + Debug("Connected to", ExtractWithoutPassword(address), "ID", connOptions.Id) a.azureConnection = azureConnection - var err error a.session, err = a.azureConnection.NewSession(ctx, nil) + if err != nil { + return fmt.Errorf("failed to open session, for the connection id:%s, error: %w", a.Id(), err) + } go func() { <-azureConnection.Done() { a.lifeCycle.SetState(&StateClosed{error: azureConnection.Err()}) if azureConnection.Err() != nil { - Error("connection closed unexpectedly", "error", azureConnection.Err()) + Error("connection closed unexpectedly", "error", azureConnection.Err(), "ID", a.Id()) a.maybeReconnect() return } - Debug("connection closed successfully") + Debug("connection closed successfully", "ID", a.Id()) } }() - if err != nil { - return err - } err = a.management.Open(ctx, a) if err != nil { // TODO close connection? return err } + Debug("Management interface opened", "ID", a.Id()) return nil } func (a *AmqpConnection) maybeReconnect() { if !a.amqpConnOptions.RecoveryConfiguration.ActiveRecovery { - Info("Recovery is disabled, closing connection") + Info("Recovery is disabled, closing connection", "ID", a.Id()) return } a.lifeCycle.SetState(&StateReconnecting{}) @@ -277,12 +333,12 @@ func (a *AmqpConnection) maybeReconnect() { delay = maxDelay } - Info("Attempting reconnection", "attempt", attempt, "delay", delay) + Info("Attempting reconnection", "attempt", attempt, "delay", delay, "ID", a.Id()) time.Sleep(delay) // context with timeout ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) // try to createSender - err := a.open(ctx, a.amqpConnOptions.addresses, a.amqpConnOptions) + err := a.open(ctx, a.address, a.amqpConnOptions) cancel() if err == nil { @@ -291,7 +347,7 @@ func (a *AmqpConnection) maybeReconnect() { return } baseDelay *= 2 - Error("Reconnection attempt failed", "attempt", attempt, "error", err) + Error("Reconnection attempt failed", "attempt", attempt, "error", err, "ID", a.Id()) } } @@ -308,7 +364,7 @@ func (a *AmqpConnection) restartEntities() { if err := publisher.createSender(ctx); err != nil { atomic.AddInt32(&publisherFails, 1) - Error("Failed to restart publisher", "ID", publisher.Id(), "error", err) + Error("Failed to restart publisher", "ID", publisher.Id(), "error", err, "ID", a.Id()) } return true }) @@ -321,7 +377,7 @@ func (a *AmqpConnection) restartEntities() { if err := consumer.createReceiver(ctx); err != nil { atomic.AddInt32(&consumerFails, 1) - Error("Failed to restart consumer", "ID", consumer.Id(), "error", err) + Error("Failed to restart consumer", "ID", consumer.Id(), "error", err, "ID", a.Id()) } return true }) @@ -350,7 +406,7 @@ func (a *AmqpConnection) Close(ctx context.Context) error { err := a.management.Close(ctx) if err != nil { - Error("Failed to close management", "error:", err) + Error("Failed to close management", "error:", err, "ID", a.Id()) } err = a.azureConnection.Close() a.close() @@ -360,7 +416,7 @@ func (a *AmqpConnection) Close(ctx context.Context) error { // NotifyStatusChange registers a channel to receive getState change notifications // from the connection. func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged) { - a.lifeCycle.chStatusChanged = channel + a.lifeCycle.notifyStatusChange(channel) } func (a *AmqpConnection) State() ILifeCycleState { @@ -368,7 +424,7 @@ func (a *AmqpConnection) State() ILifeCycleState { } func (a *AmqpConnection) Id() string { - return a.id + return a.amqpConnOptions.Id } // *** management section *** @@ -379,4 +435,24 @@ func (a *AmqpConnection) Management() *AmqpManagement { return a.management } +func (a *AmqpConnection) RefreshToken(background context.Context, token string) error { + if !a.amqpConnOptions.isOAuth2() { + return fmt.Errorf("the connection is not configured to use OAuth2 token") + } + + if a.amqpConnOptions.isOAuth2() && !a.featuresAvailable.is41OrMore { + return fmt.Errorf("the server does not support OAuth2 token, you need to upgrade to RabbitMQ 4.1 or later") + } + + err := a.Management().refreshToken(background, token) + if err != nil { + return err + } + // update the SASLType in case of reconnect after token refresh + // it should use the new token + a.amqpConnOptions.SASLType = amqp.SASLTypePlain("", token) + return nil + +} + //*** end management section *** diff --git a/pkg/rabbitmqamqp/amqp_connection_recovery.go b/pkg/rabbitmqamqp/amqp_connection_recovery.go index 76c02f0..5a3871d 100644 --- a/pkg/rabbitmqamqp/amqp_connection_recovery.go +++ b/pkg/rabbitmqamqp/amqp_connection_recovery.go @@ -29,6 +29,17 @@ type RecoveryConfiguration struct { MaxReconnectAttempts int } +func (c *RecoveryConfiguration) Clone() *RecoveryConfiguration { + cloned := &RecoveryConfiguration{ + ActiveRecovery: c.ActiveRecovery, + BackOffReconnectInterval: c.BackOffReconnectInterval, + MaxReconnectAttempts: c.MaxReconnectAttempts, + } + + return cloned + +} + func NewRecoveryConfiguration() *RecoveryConfiguration { return &RecoveryConfiguration{ ActiveRecovery: true, diff --git a/pkg/rabbitmqamqp/amqp_connection_recovery_test.go b/pkg/rabbitmqamqp/amqp_connection_recovery_test.go index 34f93ca..5a4481a 100644 --- a/pkg/rabbitmqamqp/amqp_connection_recovery_test.go +++ b/pkg/rabbitmqamqp/amqp_connection_recovery_test.go @@ -23,7 +23,7 @@ var _ = Describe("Recovery connection test", func() { */ name := "connection should reconnect producers and consumers if dropped by via REST API" - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), ContainerID: name, // reduced the reconnect interval to speed up the test @@ -32,6 +32,7 @@ var _ = Describe("Recovery connection test", func() { BackOffReconnectInterval: 2 * time.Second, MaxReconnectAttempts: 5, }, + Id: "reconnect producers and consumers", }) Expect(err).To(BeNil()) ch := make(chan *StateChanged, 1) @@ -136,7 +137,7 @@ var _ = Describe("Recovery connection test", func() { It("connection should not reconnect producers and consumers if the auto-recovery is disabled", func() { name := "connection should reconnect producers and consumers if dropped by via REST API" - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), ContainerID: name, // reduced the reconnect interval to speed up the test @@ -174,7 +175,7 @@ var _ = Describe("Recovery connection test", func() { It("validate the Recovery connection parameters", func() { - _, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + _, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), // reduced the reconnect interval to speed up the test RecoveryConfiguration: &RecoveryConfiguration{ @@ -186,7 +187,7 @@ var _ = Describe("Recovery connection test", func() { Expect(err).NotTo(BeNil()) Expect(err.Error()).To(ContainSubstring("BackOffReconnectInterval should be greater than")) - _, err = Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + _, err = Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), RecoveryConfiguration: &RecoveryConfiguration{ ActiveRecovery: true, diff --git a/pkg/rabbitmqamqp/amqp_connection_test.go b/pkg/rabbitmqamqp/amqp_connection_test.go index 1bdecdf..317ea71 100644 --- a/pkg/rabbitmqamqp/amqp_connection_test.go +++ b/pkg/rabbitmqamqp/amqp_connection_test.go @@ -2,24 +2,29 @@ package rabbitmqamqp import ( "context" + "crypto/tls" + "crypto/x509" + "fmt" "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "os" + "sync" "time" ) var _ = Describe("AMQP connection Test", func() { It("AMQP SASLTypeAnonymous connection should succeed", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous()}) Expect(err).To(BeNil()) err = connection.Close(context.Background()) Expect(err).To(BeNil()) }) - + // It("AMQP SASLTypePlain connection should succeed", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypePlain("guest", "guest")}) Expect(err).To(BeNil()) @@ -28,38 +33,17 @@ var _ = Describe("AMQP connection Test", func() { err = connection.Close(context.Background()) Expect(err).To(BeNil()) }) - - It("AMQP connection connect to the one correct uri and fails the others", func() { - conn, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://"}, nil) - Expect(err).To(BeNil()) - Expect(conn.Close(context.Background())) - }) - - It("AMQP connection should fail due of wrong Port", func() { - _, err := Dial(context.Background(), []string{"amqp://localhost:1234"}, nil) - Expect(err).NotTo(BeNil()) - }) - - It("AMQP connection should fail due of wrong Host", func() { - _, err := Dial(context.Background(), []string{"amqp://wrong_host:5672"}, nil) - Expect(err).NotTo(BeNil()) - }) - - It("AMQP connection should fails with all the wrong uris", func() { - _, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://nono"}, nil) - Expect(err).NotTo(BeNil()) - }) - + // It("AMQP connection should fail due to context cancellation", func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) cancel() - _, err := Dial(ctx, []string{"amqp://"}, nil) + _, err := Dial(ctx, "amqp://", nil) Expect(err).NotTo(BeNil()) }) - + // It("AMQP connection should receive events", func() { ch := make(chan *StateChanged, 1) - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) connection.NotifyStatusChange(ch) err = connection.Close(context.Background()) @@ -72,7 +56,7 @@ var _ = Describe("AMQP connection Test", func() { }) It("Entity tracker should be aligned with consumers and publishers ", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous()}) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) @@ -130,22 +114,54 @@ var _ = Describe("AMQP connection Test", func() { Expect(err).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil()) - }) - //It("AMQP TLS connection should success with SASLTypeAnonymous ", func() { - // amqpConnection := NewAmqpConnection() - // Expect(amqpConnection).NotTo(BeNil()) - // Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - // - // connectionSettings := NewConnUrlHelper(). - // UseSsl(true).Port(5671).TlsConfig(&tls.Config{ - // //ServerName: "localhost", - // InsecureSkipVerify: true, - // }) - // Expect(connectionSettings).NotTo(BeNil()) - // Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnUrlHelper{})) - // err := amqpConnection.Open(context.Background(), connectionSettings) - // Expect(err).To(BeNil()) - //}) + Describe("AMQP TLS connection should succeed with in different vhosts with Anonymous and External.", func() { + wg := &sync.WaitGroup{} + wg.Add(4) + DescribeTable("TLS connection should success in different vhosts ", func(virtualHost string, sasl amqp.SASLType) { + // Load CA cert + caCert, err := os.ReadFile("../../.ci/certs/ca_certificate.pem") + Expect(err).To(BeNil()) + + // Create a CA certificate pool and add the CA certificate to it + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Load client cert + clientCert, err := tls.LoadX509KeyPair("../../.ci/certs/client_localhost_certificate.pem", + "../../.ci/certs/client_localhost_key.pem") + Expect(err).To(BeNil()) + + // Create a TLS configuration + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{clientCert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + ServerName: "localhost", + } + + // Dial the AMQP server with TLS configuration + connection, err := Dial(context.Background(), fmt.Sprintf("amqps://localhost:5671/%s", virtualHost), &AmqpConnOptions{ + SASLType: sasl, + TLSConfig: tlsConfig, + }) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + + // Close the connection + err = connection.Close(context.Background()) + Expect(err).To(BeNil()) + wg.Done() + }, + Entry("with virtual host. External", "%2F", amqp.SASLTypeExternal("")), + Entry("with a not default virtual host. External", "tls", amqp.SASLTypeExternal("")), + Entry("with virtual host. Anonymous", "%2F", amqp.SASLTypeAnonymous()), + Entry("with a not default virtual host. Anonymous", "tls", amqp.SASLTypeAnonymous()), + ) + go func() { + wg.Wait() + }() + }) + }) diff --git a/pkg/rabbitmqamqp/amqp_consumer_stream_test.go b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go index eadaca9..ecd3b02 100644 --- a/pkg/rabbitmqamqp/amqp_consumer_stream_test.go +++ b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go @@ -28,7 +28,7 @@ var _ = Describe("Consumer stream test", func() { */ qName := generateName("start consuming with different offset types") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ Name: qName, @@ -125,7 +125,7 @@ var _ = Describe("Consumer stream test", func() { */ qName := generateName("consumer should restart form the last offset in case of disconnection") - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), ContainerID: qName, RecoveryConfiguration: &RecoveryConfiguration{ @@ -188,7 +188,7 @@ var _ = Describe("Consumer stream test", func() { }) It("consumer should filter messages based on x-stream-filter", func() { qName := generateName("consumer should filter messages based on x-stream-filter") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ Name: qName, @@ -320,7 +320,7 @@ var _ = Describe("Consumer stream test", func() { Describe("consumer should filter messages based on application properties", func() { qName := generateName("consumer should filter messages based on application properties") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ Name: qName, @@ -394,7 +394,7 @@ var _ = Describe("Consumer stream test", func() { */ qName := generateName("consumer should filter messages based on properties") qName += time.Now().String() - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ Name: qName, @@ -558,7 +558,7 @@ var _ = Describe("Consumer stream test", func() { type msgLogic = func(*amqp.Message) func publishMessagesWithMessageLogic(queue string, label string, count int, logic msgLogic) { - conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil) + conn, err := Dial(context.TODO(), "amqp://guest:guest@localhost", nil) Expect(err).To(BeNil()) publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queue}, diff --git a/pkg/rabbitmqamqp/amqp_consumer_test.go b/pkg/rabbitmqamqp/amqp_consumer_test.go index 40945a0..6a5bd73 100644 --- a/pkg/rabbitmqamqp/amqp_consumer_test.go +++ b/pkg/rabbitmqamqp/amqp_consumer_test.go @@ -13,7 +13,7 @@ var _ = Describe("NewConsumer tests", func() { It("AMQP NewConsumer should fail due to context cancellation", func() { qName := generateNameWithDateTime("AMQP NewConsumer should fail due to context cancellation") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ @@ -33,7 +33,7 @@ var _ = Describe("NewConsumer tests", func() { It("AMQP NewConsumer should ack and empty the queue", func() { qName := generateNameWithDateTime("AMQP NewConsumer should ack and empty the queue") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ Name: qName, @@ -62,7 +62,7 @@ var _ = Describe("NewConsumer tests", func() { It("AMQP NewConsumer should requeue the message to the queue", func() { qName := generateNameWithDateTime("AMQP NewConsumer should requeue the message to the queue") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ Name: qName, @@ -90,7 +90,7 @@ var _ = Describe("NewConsumer tests", func() { It("AMQP NewConsumer should requeue the message to the queue with annotations", func() { qName := generateNameWithDateTime("AMQP NewConsumer should requeue the message to the queue with annotations") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ Name: qName, @@ -126,7 +126,7 @@ var _ = Describe("NewConsumer tests", func() { It("AMQP NewConsumer should discard the message to the queue with and without annotations", func() { // TODO: Implement this test with a dead letter queue to test the discard feature qName := generateNameWithDateTime("AMQP NewConsumer should discard the message to the queue with and without annotations") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ Name: qName, diff --git a/pkg/rabbitmqamqp/amqp_environment.go b/pkg/rabbitmqamqp/amqp_environment.go index dd55901..6e51fc3 100644 --- a/pkg/rabbitmqamqp/amqp_environment.go +++ b/pkg/rabbitmqamqp/amqp_environment.go @@ -3,47 +3,103 @@ package rabbitmqamqp import ( "context" "fmt" + "github.com/Azure/go-amqp" "sync" + "sync/atomic" ) -type Environment struct { - connections sync.Map - addresses []string - connOptions *AmqpConnOptions +type TEndPointStrategy int + +const ( + StrategyRandom TEndPointStrategy = iota + StrategySequential TEndPointStrategy = iota +) + +type Endpoint struct { + Address string + Options *AmqpConnOptions } -func NewEnvironment(addresses []string, connOptions *AmqpConnOptions) *Environment { +func DefaultEndpoints() []Endpoint { + ep := Endpoint{ + Address: "amqp://", + Options: &AmqpConnOptions{ + SASLType: amqp.SASLTypeAnonymous(), + }, + } + + return []Endpoint{ep} +} + +type Environment struct { + connections sync.Map + endPoints []Endpoint + EndPointStrategy TEndPointStrategy + nextConnectionId int32 +} + +func NewEnvironment(address string, options *AmqpConnOptions) *Environment { + return NewClusterEnvironmentWithStrategy([]Endpoint{{Address: address, Options: options}}, StrategyRandom) +} + +func NewClusterEnvironment(endPoints []Endpoint) *Environment { + return NewClusterEnvironmentWithStrategy(endPoints, StrategyRandom) +} + +func NewClusterEnvironmentWithStrategy(endPoints []Endpoint, strategy TEndPointStrategy) *Environment { return &Environment{ - connections: sync.Map{}, - addresses: addresses, - connOptions: connOptions, + connections: sync.Map{}, + endPoints: endPoints, + EndPointStrategy: strategy, + nextConnectionId: 0, } } // NewConnection get a new connection from the environment. -// If the connection id is provided, it will be used as the connection id. -// If the connection id is not provided, a new connection id will be generated. -// The connection id is unique in the environment. +// It picks an endpoint from the list of endpoints, based on EndPointStrategy, and tries to open a connection. +// It fails if all the endpoints are not reachable. // The Environment will keep track of the connection and close it when the environment is closed. -func (e *Environment) NewConnection(ctx context.Context, args ...string) (*AmqpConnection, error) { - if len(args) > 0 && len(args[0]) > 0 { - // check if connection already exists - if _, ok := e.connections.Load(args[0]); ok { - return nil, fmt.Errorf("connection with id %s already exists", args[0]) - } - } +func (e *Environment) NewConnection(ctx context.Context) (*AmqpConnection, error) { - connection, err := Dial(ctx, e.addresses, e.connOptions, args...) - if err != nil { - return nil, err + tmp := make([]Endpoint, len(e.endPoints)) + copy(tmp, e.endPoints) + lastError := error(nil) + for len(tmp) > 0 { + idx := 0 + + switch e.EndPointStrategy { + case StrategyRandom: + idx = random(len(tmp)) + case StrategySequential: + idx = 0 + } + + addr := tmp[idx] + // remove the index from the tmp list + tmp = append(tmp[:idx], tmp[idx+1:]...) + var cloned *AmqpConnOptions + if addr.Options != nil { + cloned = addr.Options.Clone() + } + connection, err := Dial(ctx, addr.Address, cloned) + if err != nil { + Error("Failed to open connection", ExtractWithoutPassword(addr.Address), err) + lastError = err + continue + } + + // here we use it to make each connection unique + atomic.AddInt32(&e.nextConnectionId, 1) + connection.amqpConnOptions.Id = fmt.Sprintf("%s_%d", connection.amqpConnOptions.Id, e.nextConnectionId) + e.connections.Store(connection.Id(), connection) + + connection.refMap = &e.connections + return connection, nil } - e.connections.Store(connection.Id(), connection) - connection.refMap = &e.connections - return connection, nil + return nil, fmt.Errorf("fail to open connection. Last error: %w", lastError) } // Connections gets the active connections in the environment - func (e *Environment) Connections() []*AmqpConnection { connections := make([]*AmqpConnection, 0) e.connections.Range(func(key, value interface{}) bool { diff --git a/pkg/rabbitmqamqp/amqp_environment_test.go b/pkg/rabbitmqamqp/amqp_environment_test.go index 4125f79..e41eb83 100644 --- a/pkg/rabbitmqamqp/amqp_environment_test.go +++ b/pkg/rabbitmqamqp/amqp_environment_test.go @@ -2,13 +2,14 @@ package rabbitmqamqp import ( "context" + "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) var _ = Describe("AMQP Environment Test", func() { It("AMQP Environment connection should succeed", func() { - env := NewEnvironment([]string{"amqp://"}, nil) + env := NewClusterEnvironment([]Endpoint{{Address: "amqp://"}}) Expect(env).NotTo(BeNil()) Expect(env.Connections()).NotTo(BeNil()) Expect(len(env.Connections())).To(Equal(0)) @@ -22,7 +23,7 @@ var _ = Describe("AMQP Environment Test", func() { }) It("AMQP Environment CloseConnections should remove all the elements form the list", func() { - env := NewEnvironment([]string{"amqp://"}, nil) + env := NewClusterEnvironment([]Endpoint{{Address: "amqp://"}}) Expect(env).NotTo(BeNil()) Expect(env.Connections()).NotTo(BeNil()) Expect(len(env.Connections())).To(Equal(0)) @@ -36,22 +37,83 @@ var _ = Describe("AMQP Environment Test", func() { Expect(len(env.Connections())).To(Equal(0)) }) - It("AMQP Environment connection ID should be unique", func() { - env := NewEnvironment([]string{"amqp://"}, nil) + It("Get new connection should connect to the one correct uri and fails the others", func() { + + env := NewClusterEnvironment([]Endpoint{{Address: "amqp://localhost:1234"}, {Address: "amqp://nohost:555"}, {Address: "amqp://"}}) + conn, err := env.NewConnection(context.Background()) + Expect(err).To(BeNil()) + Expect(conn.Close(context.Background())) + }) + + It("Get new connection should fail due of wrong Port", func() { + env := NewClusterEnvironment([]Endpoint{{Address: "amqp://localhost:1234"}}) + _, err := env.NewConnection(context.Background()) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP connection should fail due of wrong Host", func() { + env := NewClusterEnvironment([]Endpoint{{Address: "amqp://wrong_host:5672"}}) + _, err := env.NewConnection(context.Background()) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP connection should fails with all the wrong uris", func() { + env := NewClusterEnvironment([]Endpoint{{Address: "amqp://localhost:1234"}, {Address: "amqp://nohost:555"}, {Address: "amqp://nono"}}) + _, err := env.NewConnection(context.Background()) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP connection should success in different vhosts", func() { + // user_1 and vhost_user_1 are preloaded in the rabbitmq server during the startup + env := NewEnvironment("amqp://user_1:user_1@localhost:5672/vhost_user_1", nil) Expect(env).NotTo(BeNil()) Expect(env.Connections()).NotTo(BeNil()) Expect(len(env.Connections())).To(Equal(0)) - connection, err := env.NewConnection(context.Background(), "myConnectionId") + conn, err := env.NewConnection(context.Background()) Expect(err).To(BeNil()) - Expect(connection).NotTo(BeNil()) - Expect(len(env.Connections())).To(Equal(1)) - connectionShouldBeNil, err := env.NewConnection(context.Background(), "myConnectionId") - Expect(err).NotTo(BeNil()) - Expect(err.Error()).To(ContainSubstring("connection with id myConnectionId already exists")) - Expect(connectionShouldBeNil).To(BeNil()) - Expect(len(env.Connections())).To(Equal(1)) - Expect(connection.Close(context.Background())).To(BeNil()) - Expect(len(env.Connections())).To(Equal(0)) - + Expect(conn.Close(context.Background())) }) + + It("AMQP connection should fail with user_1 does not have the grant for /", func() { + // user_1 is preloaded in the rabbitmq server during the startup + env := NewEnvironment("amqp://user_1:user_1@localhost:5672/", nil) + Expect(env).NotTo(BeNil()) + _, err := env.NewConnection(context.Background()) + Expect(err).NotTo(BeNil()) + }) + + Describe("Environment strategy", func() { + DescribeTable("Environment with strategy should success", func(strategy TEndPointStrategy) { + env := NewClusterEnvironmentWithStrategy([]Endpoint{{Address: "amqp://", Options: &AmqpConnOptions{Id: "my"}}, {Address: "amqp://nohost:555"}, {Address: "amqp://nono"}}, StrategyRandom) + Expect(env).NotTo(BeNil()) + Expect(env.Connections()).NotTo(BeNil()) + Expect(len(env.Connections())).To(Equal(0)) + conn, err := env.NewConnection(context.Background()) + Expect(err).To(BeNil()) + Expect(conn.Id()).To(Equal("my_1")) + Expect(conn.Close(context.Background())) + }, + Entry("StrategyRandom", StrategyRandom), + Entry("StrategySequential", StrategySequential), + ) + }) + + Describe("Environment should success even partial options", func() { + DescribeTable("Environment should success even partial options", func(options *AmqpConnOptions) { + + env := NewClusterEnvironment([]Endpoint{{Address: "amqp://", Options: options}}) + Expect(env).NotTo(BeNil()) + Expect(env.Connections()).NotTo(BeNil()) + Expect(len(env.Connections())).To(Equal(0)) + conn, err := env.NewConnection(context.Background()) + Expect(err).To(BeNil()) + Expect(conn.Close(context.Background())) + + }, + Entry("Partial options", &AmqpConnOptions{Id: "my"}), + Entry("Partial options", &AmqpConnOptions{SASLType: amqp.SASLTypeAnonymous()}), + Entry("Partial options", &AmqpConnOptions{ContainerID: "cid_my"}), + ) + }) + }) diff --git a/pkg/rabbitmqamqp/amqp_exchange_test.go b/pkg/rabbitmqamqp/amqp_exchange_test.go index db348dc..de964bb 100644 --- a/pkg/rabbitmqamqp/amqp_exchange_test.go +++ b/pkg/rabbitmqamqp/amqp_exchange_test.go @@ -11,7 +11,7 @@ var _ = Describe("AMQP Exchange test ", func() { var connection *AmqpConnection var management *AmqpManagement BeforeEach(func() { - conn, err := Dial(context.TODO(), []string{"amqp://"}, nil) + conn, err := Dial(context.TODO(), "amqp://", nil) connection = conn Expect(err).To(BeNil()) management = connection.Management() diff --git a/pkg/rabbitmqamqp/amqp_management.go b/pkg/rabbitmqamqp/amqp_management.go index 574b49c..f20b6a8 100644 --- a/pkg/rabbitmqamqp/amqp_management.go +++ b/pkg/rabbitmqamqp/amqp_management.go @@ -25,7 +25,7 @@ type AmqpManagement struct { lifeCycle *LifeCycle } -func NewAmqpManagement() *AmqpManagement { +func newAmqpManagement() *AmqpManagement { return &AmqpManagement{ lifeCycle: NewLifeCycle(), } @@ -243,6 +243,11 @@ func (a *AmqpManagement) PurgeQueue(ctx context.Context, name string) (int, erro return purge.Purge(ctx) } +func (a *AmqpManagement) refreshToken(ctx context.Context, token string) error { + _, err := a.Request(ctx, []byte(token), authTokens, commandPut, []int{responseCode204}) + return err +} + func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged) { a.lifeCycle.chStatusChanged = channel } diff --git a/pkg/rabbitmqamqp/amqp_management_test.go b/pkg/rabbitmqamqp/amqp_management_test.go index 708f922..40daa10 100644 --- a/pkg/rabbitmqamqp/amqp_management_test.go +++ b/pkg/rabbitmqamqp/amqp_management_test.go @@ -11,7 +11,7 @@ import ( var _ = Describe("Management tests", func() { It("AMQP Management should fail due to context cancellation", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) @@ -23,7 +23,7 @@ var _ = Describe("Management tests", func() { It("AMQP Management should receive events", func() { ch := make(chan *StateChanged, 2) - connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), RecoveryConfiguration: &RecoveryConfiguration{ ActiveRecovery: false, @@ -43,7 +43,7 @@ var _ = Describe("Management tests", func() { It("Request", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) management := connection.Management() @@ -67,7 +67,7 @@ var _ = Describe("Management tests", func() { It("GET on non-existing queue returns ErrDoesNotExist", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) management := connection.Management() diff --git a/pkg/rabbitmqamqp/amqp_publisher_test.go b/pkg/rabbitmqamqp/amqp_publisher_test.go index b22134b..5617bf6 100644 --- a/pkg/rabbitmqamqp/amqp_publisher_test.go +++ b/pkg/rabbitmqamqp/amqp_publisher_test.go @@ -10,7 +10,7 @@ import ( var _ = Describe("AMQP publisher ", func() { It("Send a message to a queue with a Message Target NewPublisher", func() { qName := generateNameWithDateTime("Send a message to a queue with a Message Target NewPublisher") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ @@ -36,7 +36,7 @@ var _ = Describe("AMQP publisher ", func() { }) It("NewPublisher should fail to a not existing exchange", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) exchangeName := "Nope" @@ -48,7 +48,7 @@ var _ = Describe("AMQP publisher ", func() { It("publishResult should released to a not existing routing key", func() { eName := generateNameWithDateTime("publishResult should released to a not existing routing key") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) exchange, err := connection.Management().DeclareExchange(context.Background(), &TopicExchangeSpecification{ @@ -75,7 +75,7 @@ var _ = Describe("AMQP publisher ", func() { It("Send a message to a deleted queue should fail", func() { qName := generateNameWithDateTime("Send a message to a deleted queue should fail") - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) _, err = connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ @@ -97,7 +97,7 @@ var _ = Describe("AMQP publisher ", func() { }) It("Multi Targets NewPublisher should fail with StateReleased when the destination does not exist", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) publisher, err := connection.NewPublisher(context.Background(), nil, nil) @@ -121,7 +121,7 @@ var _ = Describe("AMQP publisher ", func() { }) It("Multi Targets NewPublisher should success with StateReceived when the destination exists", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) Expect(err).To(BeNil()) @@ -177,7 +177,7 @@ var _ = Describe("AMQP publisher ", func() { }) It("Multi Targets NewPublisher should fail it TO is not set or not valid", func() { - connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) publisher, err := connection.NewPublisher(context.Background(), nil, nil) diff --git a/pkg/rabbitmqamqp/amqp_queue_test.go b/pkg/rabbitmqamqp/amqp_queue_test.go index 51b97e7..53ead23 100644 --- a/pkg/rabbitmqamqp/amqp_queue_test.go +++ b/pkg/rabbitmqamqp/amqp_queue_test.go @@ -11,7 +11,7 @@ var _ = Describe("AMQP Queue test ", func() { var connection *AmqpConnection var management *AmqpManagement BeforeEach(func() { - conn, err := Dial(context.TODO(), []string{"amqp://"}, nil) + conn, err := Dial(context.TODO(), "amqp://", nil) Expect(err).To(BeNil()) connection = conn management = connection.Management() @@ -244,7 +244,7 @@ var _ = Describe("AMQP Queue test ", func() { }) func publishMessages(queueName string, count int, args ...string) { - conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil) + conn, err := Dial(context.TODO(), "amqp://guest:guest@localhost", nil) Expect(err).To(BeNil()) publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queueName}, nil) diff --git a/pkg/rabbitmqamqp/amqp_types.go b/pkg/rabbitmqamqp/amqp_types.go index 7918961..a4b49b5 100644 --- a/pkg/rabbitmqamqp/amqp_types.go +++ b/pkg/rabbitmqamqp/amqp_types.go @@ -1,6 +1,7 @@ package rabbitmqamqp import ( + "fmt" "github.com/Azure/go-amqp" "github.com/google/uuid" ) @@ -40,6 +41,9 @@ type IConsumerOptions interface { // id returns the id of the consumer id() string + + // validate the consumer options based on the available features + validate(available *featuresAvailable) error } func getInitialCredits(co IConsumerOptions) int32 { @@ -76,6 +80,10 @@ func (mo *managementOptions) id() string { return "management" } +func (mo *managementOptions) validate(available *featuresAvailable) error { + return nil +} + // ConsumerOptions represents the options for quorum and classic queues type ConsumerOptions struct { //ReceiverLinkName: see the IConsumerOptions interface @@ -102,6 +110,10 @@ func (aco *ConsumerOptions) id() string { return aco.Id } +func (aco *ConsumerOptions) validate(available *featuresAvailable) error { + return nil +} + type IOffsetSpecification interface { toLinkFilter() amqp.LinkFilter } @@ -274,6 +286,15 @@ func (sco *StreamConsumerOptions) id() string { return sco.Id } +func (sco *StreamConsumerOptions) validate(available *featuresAvailable) error { + if sco.StreamFilterOptions != nil && sco.StreamFilterOptions.Properties != nil { + if !available.is41OrMore { + return fmt.Errorf("stream consumer with properties filter is not supported. You need RabbitMQ 4.1 or later") + } + } + return nil +} + ///// PublisherOptions ///// type IPublisherOptions interface { diff --git a/pkg/rabbitmqamqp/amqp_utils.go b/pkg/rabbitmqamqp/amqp_utils.go index 9c777cc..8a01bb4 100644 --- a/pkg/rabbitmqamqp/amqp_utils.go +++ b/pkg/rabbitmqamqp/amqp_utils.go @@ -2,10 +2,9 @@ package rabbitmqamqp import ( "fmt" + "github.com/Azure/go-amqp" "math/rand" "time" - - "github.com/Azure/go-amqp" ) const AtMostOnce = 0 diff --git a/pkg/rabbitmqamqp/common.go b/pkg/rabbitmqamqp/common.go index 072cf9d..f20d0ae 100644 --- a/pkg/rabbitmqamqp/common.go +++ b/pkg/rabbitmqamqp/common.go @@ -29,6 +29,7 @@ const ( key = "key" queues = "queues" bindings = "bindings" + authTokens = "/auth/tokens" ) func validatePositive(label string, value int64) error { diff --git a/pkg/rabbitmqamqp/features_available_test.go b/pkg/rabbitmqamqp/features_available_test.go index d45ec5c..afb5de8 100644 --- a/pkg/rabbitmqamqp/features_available_test.go +++ b/pkg/rabbitmqamqp/features_available_test.go @@ -2,6 +2,7 @@ package rabbitmqamqp import ( "fmt" + "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -108,6 +109,24 @@ var _ = Describe("Available Features", func() { Expect(availableFeatures.is4OrMore).To(BeTrue()) Expect(availableFeatures.is41OrMore).To(BeTrue()) Expect(availableFeatures.isRabbitMQ).To(BeTrue()) - }) + + It("StreamConsumerOptions validate for RabbitMQ 4.1", func() { + Expect((&StreamConsumerOptions{ + StreamFilterOptions: &StreamFilterOptions{ + Properties: &amqp.MessageProperties{ + MessageID: "123", + }, + }, + }).validate(&featuresAvailable{is41OrMore: false})).To(MatchError("stream consumer with properties filter is not supported. You need RabbitMQ 4.1 or later")) + + Expect((&StreamConsumerOptions{ + StreamFilterOptions: &StreamFilterOptions{ + Properties: &amqp.MessageProperties{ + MessageID: "123", + }, + }, + }).validate(&featuresAvailable{is41OrMore: true})).To(BeNil()) + }) + }) diff --git a/pkg/rabbitmqamqp/life_cycle.go b/pkg/rabbitmqamqp/life_cycle.go index 3894181..e6feb9e 100644 --- a/pkg/rabbitmqamqp/life_cycle.go +++ b/pkg/rabbitmqamqp/life_cycle.go @@ -122,3 +122,9 @@ func (l *LifeCycle) SetState(value ILifeCycleState) { To: value, } } + +func (l *LifeCycle) notifyStatusChange(channel chan *StateChanged) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.chStatusChanged = channel +} diff --git a/pkg/rabbitmqamqp/oauth2_test.go b/pkg/rabbitmqamqp/oauth2_test.go new file mode 100644 index 0000000..6c1dfb4 --- /dev/null +++ b/pkg/rabbitmqamqp/oauth2_test.go @@ -0,0 +1,207 @@ +package rabbitmqamqp + +// test the OAuth2 connection + +import ( + "context" + "encoding/base64" + "github.com/golang-jwt/jwt/v5" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + testhelper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper" + "math/rand" + "time" +) + +const Base64Key = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH" + +// const HmacKey KEY = new HmacKey(Base64.getDecoder().decode(Base64Key)); +const AUDIENCE = "rabbitmq" + +// Helper function to generate random string +func randomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + result := make([]byte, length) + for i := range result { + result[i] = charset[rand.Intn(len(charset))] + } + return string(result) +} + +var _ = Describe("OAuth2 Tests", func() { + It("OAuth2 Connection should success", func() { + tokenString := token(time.Now().Add(time.Duration(2500) * time.Millisecond)) + Expect(tokenString).NotTo(BeEmpty()) + + conn, err := Dial(context.TODO(), "amqp://localhost:5672", + &AmqpConnOptions{ + ContainerID: "oAuth2Test", + OAuth2Options: &OAuth2Options{ + Token: tokenString, + }, + }) + Expect(err).To(BeNil()) + Expect(conn).NotTo(BeNil()) + qName := generateName("OAuth2 Connection should success") + _, err = conn.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + Expect(conn.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(conn.Close(context.Background())).To(BeNil()) + }) + + It("OAuth2 Connection should disconnect after the timeout", func() { + + tokenString := token(time.Now().Add(time.Duration(1_000) * time.Millisecond)) + Expect(tokenString).NotTo(BeEmpty()) + + conn, err := Dial(context.TODO(), "amqp://localhost:5672", + &AmqpConnOptions{ + ContainerID: "oAuth2TestTimeout", + OAuth2Options: &OAuth2Options{ + Token: tokenString, + }, + RecoveryConfiguration: &RecoveryConfiguration{ + ActiveRecovery: false, + }, + }) + Expect(err).To(BeNil()) + Expect(conn).NotTo(BeNil()) + ch := make(chan *StateChanged, 1) + go func() { + defer GinkgoRecover() + for statusChanged := range ch { + x := statusChanged.To.(*StateClosed) + Expect(x.GetError()).NotTo(BeNil()) + Expect(x.GetError().Error()).To(ContainSubstring("credential expired")) + } + }() + + conn.NotifyStatusChange(ch) + time.Sleep(1 * time.Second) + }) + + It("OAuth2 Connection should be alive after token refresh", func() { + tokenString := token(time.Now().Add(time.Duration(1) * time.Second)) + Expect(tokenString).NotTo(BeEmpty()) + + conn, err := Dial(context.TODO(), "amqp://localhost:5672", + &AmqpConnOptions{ + ContainerID: "oAuth2Test", + OAuth2Options: &OAuth2Options{ + Token: tokenString, + }, + RecoveryConfiguration: &RecoveryConfiguration{ + ActiveRecovery: false, + }, + }) + Expect(err).To(BeNil()) + Expect(conn).NotTo(BeNil()) + time.Sleep(100 * time.Millisecond) + err = conn.RefreshToken(context.Background(), token(time.Now().Add(time.Duration(2500)*time.Millisecond))) + time.Sleep(1 * time.Second) + Expect(err).To(BeNil()) + Expect(conn.Close(context.Background())).To(BeNil()) + }) + + // this test is a bit flaky, it may fail if the connection is not closed in time + // that should mark as flakes + + It("OAuth2 Connection should use the new token to reconnect", func() { + name := "oAuth2TestReconnect_" + time.Now().String() + startToken := token(time.Now().Add(time.Duration(1) * time.Second)) + connection, err := Dial(context.Background(), "amqp://", &AmqpConnOptions{ + OAuth2Options: &OAuth2Options{ + Token: startToken, + }, + ContainerID: name, + // reduced the reconnect interval to speed up the test + RecoveryConfiguration: &RecoveryConfiguration{ + ActiveRecovery: true, + BackOffReconnectInterval: 1100 * time.Millisecond, + MaxReconnectAttempts: 5, + }, + }) + + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + ch := make(chan *StateChanged, 1) + connection.NotifyStatusChange(ch) + newToken := token(time.Now().Add(time.Duration(10) * time.Second)) + Expect(connection.RefreshToken(context.Background(), newToken)).To(BeNil()) + time.Sleep(1 * time.Second) + // here the token used during the connection (startToken) is expired + // the new token should be used to reconnect. + // The test is to validate that the client uses the new token to reconnect + // The RefreshToken requests a new token and updates the connection with the new token + Eventually(func() bool { + err := testhelper.DropConnectionContainerID(name) + return err == nil + }).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue()) + st1 := <-ch + Expect(st1.From).To(Equal(&StateOpen{})) + Expect(st1.To).To(BeAssignableToTypeOf(&StateClosed{})) + + time.Sleep(1 * time.Second) + + // the connection should not be reconnected + Eventually(func() bool { + conn, err := testhelper.GetConnectionByContainerID(name) + return err == nil && conn != nil + }).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("Setting OAuth2 on the Environment should work", func() { + env := NewClusterEnvironment([]Endpoint{ + {Address: "amqp://", Options: &AmqpConnOptions{ + OAuth2Options: &OAuth2Options{ + Token: token(time.Now().Add(time.Duration(10) * time.Second)), + }, + }, + }}) + + Expect(env).NotTo(BeNil()) + Expect(env.Connections()).NotTo(BeNil()) + Expect(len(env.Connections())).To(Equal(0)) + connection, err := env.NewConnection(context.Background()) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + Expect(len(env.Connections())).To(Equal(1)) + Expect(connection.Close(context.Background())).To(BeNil()) + Expect(len(env.Connections())).To(Equal(0)) + }) + + It("Can't use refresh token if not OAuth2 is enabled ", func() { + connection, err := Dial(context.Background(), "amqp://", nil) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + err = connection.RefreshToken(context.Background(), token(time.Now().Add(time.Duration(10)*time.Second))) + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(ContainSubstring("is not configured to use OAuth2 token")) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + +}) + +func token(duration time.Time) string { + decodedKey, _ := base64.StdEncoding.DecodeString(Base64Key) + + claims := jwt.MapClaims{ + "iss": "unit_test", + "aud": AUDIENCE, + "exp": jwt.NewNumericDate(duration), + "scope": []string{"rabbitmq.configure:*/*", "rabbitmq.write:*/*", "rabbitmq.read:*/*"}, + "random": randomString(6), + } + + // Create a new token object, specifying signing method and the claims + token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) + token.Header["kid"] = "token-key" + + // Sign and get the complete encoded token as a string using the secret + tokenString, err := token.SignedString(decodedKey) + Expect(err).To(BeNil()) + return tokenString +} diff --git a/pkg/test-helper/http_utils.go b/pkg/test-helper/http_utils.go index 2d8b5aa..0269da7 100644 --- a/pkg/test-helper/http_utils.go +++ b/pkg/test-helper/http_utils.go @@ -62,7 +62,6 @@ func DropConnectionContainerID(Id string) error { if err != nil { return err } - return nil } @@ -74,6 +73,7 @@ func DropConnection(name string, port string) error { return nil } + func httpGet(url, username, password string) (string, error) { return baseCall(url, username, password, "GET") } @@ -106,6 +106,11 @@ func baseCall(url, username, password string, method string) (string, error) { return string(bodyBytes), nil } + if resp.StatusCode == 201 { + // Created! it is ok + return "", nil + } + if resp.StatusCode == 204 { // No Content return "", nil }