From fa7d5d9413c43e5a5278203f5fdd5fc4342031fc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 10 Sep 2024 11:26:46 +0200 Subject: [PATCH] Implement connection and management (#1) * Implement connection and management * Setup CI * First version for queue declaration --------- Signed-off-by: Gabriele Santomaggio --- .ci/certs/ca_certificate.pem | 21 ++ .ci/certs/ca_key.pem | 30 +++ .ci/certs/client_localhost.p12 | Bin 0 -> 3651 bytes .ci/certs/client_localhost_certificate.pem | 22 ++ .ci/certs/client_localhost_key.pem | 30 +++ .ci/certs/server_localhost.p12 | Bin 0 -> 3651 bytes .ci/certs/server_localhost_certificate.pem | 22 ++ .ci/certs/server_localhost_key.pem | 30 +++ .ci/publish-documentation-to-github-pages.sh | 49 ++++ .ci/ubuntu/enabled_plugins | 1 + .ci/ubuntu/gha-log-check.sh | 16 ++ .ci/ubuntu/gha-setup.sh | 180 +++++++++++++++ .ci/ubuntu/log/.gitkeep | 0 .ci/ubuntu/rabbitmq.conf | 26 +++ .github/workflows/build-test.yaml | 33 +++ .github/workflows/main.yaml | 11 + .github/workflows/publish-nuget.yaml | 27 +++ .github/workflows/publish.yaml | 15 ++ .gitignore | 4 + Makefile | 21 ++ README.md | 14 +- examples/getting_started/main.go | 57 +++++ go.mod | 24 ++ go.sum | 38 ++++ rabbitmq_amqp/amqp_connection.go | 179 +++++++++++++++ rabbitmq_amqp/amqp_connection_test.go | 96 ++++++++ rabbitmq_amqp/amqp_managent.go | 222 +++++++++++++++++++ rabbitmq_amqp/amqp_managent_test.go | 70 ++++++ rabbitmq_amqp/amqp_queue.go | 178 +++++++++++++++ rabbitmq_amqp/amqp_queue_test.go | 153 +++++++++++++ rabbitmq_amqp/common.go | 117 ++++++++++ rabbitmq_amqp/connection.go | 35 +++ rabbitmq_amqp/converters.go | 79 +++++++ rabbitmq_amqp/converters_test.go | 55 +++++ rabbitmq_amqp/entities.go | 52 +++++ rabbitmq_amqp/life_cycle.go | 53 +++++ rabbitmq_amqp/management.go | 16 ++ rabbitmq_amqp/pkg_suite_test.go | 13 ++ 38 files changed, 1988 insertions(+), 1 deletion(-) create mode 100644 .ci/certs/ca_certificate.pem create mode 100644 .ci/certs/ca_key.pem create mode 100644 .ci/certs/client_localhost.p12 create mode 100644 .ci/certs/client_localhost_certificate.pem create mode 100644 .ci/certs/client_localhost_key.pem create mode 100644 .ci/certs/server_localhost.p12 create mode 100644 .ci/certs/server_localhost_certificate.pem create mode 100644 .ci/certs/server_localhost_key.pem create mode 100755 .ci/publish-documentation-to-github-pages.sh create mode 100644 .ci/ubuntu/enabled_plugins create mode 100755 .ci/ubuntu/gha-log-check.sh create mode 100755 .ci/ubuntu/gha-setup.sh create mode 100644 .ci/ubuntu/log/.gitkeep create mode 100644 .ci/ubuntu/rabbitmq.conf create mode 100644 .github/workflows/build-test.yaml create mode 100644 .github/workflows/main.yaml create mode 100644 .github/workflows/publish-nuget.yaml create mode 100644 .github/workflows/publish.yaml create mode 100644 Makefile create mode 100644 examples/getting_started/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 rabbitmq_amqp/amqp_connection.go create mode 100644 rabbitmq_amqp/amqp_connection_test.go create mode 100644 rabbitmq_amqp/amqp_managent.go create mode 100644 rabbitmq_amqp/amqp_managent_test.go create mode 100644 rabbitmq_amqp/amqp_queue.go create mode 100644 rabbitmq_amqp/amqp_queue_test.go create mode 100644 rabbitmq_amqp/common.go create mode 100644 rabbitmq_amqp/connection.go create mode 100644 rabbitmq_amqp/converters.go create mode 100644 rabbitmq_amqp/converters_test.go create mode 100644 rabbitmq_amqp/entities.go create mode 100644 rabbitmq_amqp/life_cycle.go create mode 100644 rabbitmq_amqp/management.go create mode 100644 rabbitmq_amqp/pkg_suite_test.go diff --git a/.ci/certs/ca_certificate.pem b/.ci/certs/ca_certificate.pem new file mode 100644 index 0000000..f1ae0b2 --- /dev/null +++ b/.ci/certs/ca_certificate.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDhjCCAm6gAwIBAgIUJ2lTbiccSFtA9+8eGPQD5yGJ7w8wDQYJKoZIhvcNAQEL +BQAwTDE7MDkGA1UEAwwyVExTR2VuU2VsZlNpZ25lZHRSb290Q0EgMjAyMy0xMC0w +OFQwODoxNjowMy41OTA0NTQxDTALBgNVBAcMBCQkJCQwHhcNMjMxMDA4MTUxNjAz +WhcNMzMxMDA1MTUxNjAzWjBMMTswOQYDVQQDDDJUTFNHZW5TZWxmU2lnbmVkdFJv +b3RDQSAyMDIzLTEwLTA4VDA4OjE2OjAzLjU5MDQ1NDENMAsGA1UEBwwEJCQkJDCC +ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANdiiGj37094gAHfVpbIQHfu +ccBVozpexrYjDCbjw4IyJJOajJRNGbYZwEt3Jt5NaDc+zyoBZpKaZWDEjOxbNYkd +MtIHyFW4V4ooA6pySR9pzMI91dXoCkzL9Ex23Zrj0KF70qBQuPTbF5bnAbMELFuv +quFnfMw2ALsFrWh2DOwnMlt1hbdj6Iapl2yRGhVSgsr72SK+67b+b7WH02VGDrfm +Y3qqx3xAI6woKSE2Ot14Csak/iR1xit68X5GhzvSdOos0Yo3I4v8mlFEO+kpKWB0 +7y3Hb5AU/hqvSOwLRA+CV09bxN4N5rOfFHkPVuVMXQzX9mLCxzxroZn/sQzkrtMC +AwEAAaNgMF4wDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAQYwHQYDVR0OBBYE +FNSsn21DVr1XhhqmU+wMnLWFZc55MB8GA1UdIwQYMBaAFNSsn21DVr1XhhqmU+wM +nLWFZc55MA0GCSqGSIb3DQEBCwUAA4IBAQDRc1mAERvR1VPOaMevcB2pGaQfRLkN +fYgiO7rxd77FIQLieCbNNZ6z/fDRkBjgZ9xzAWl0vFJ0xZ0FSdUtAXEa9ES7r7eq +XOSW/5CRPeib4nMDeQvTSiCx5QqlIz4oUwW9bbpPcBQXM0IVZwo1Jbye/BGrjAmQ +Z3a5ph0f85Shjy2Yt9JB9BDCWOK8EU294CiKMUvdtQwSaQpl8GQfmvzWKAL4encu +ryEAPTDT9zuQi2bOCDY5QMwVNS6mDAsqbvMjOaHD/Cdzl26rgv+8QLVNDUvGfGtD +58bWugHyxCdnDToCtIEaJaoi7izKd0bILbuQXS7oKfryJpHwO+9U8ZjT +-----END CERTIFICATE----- diff --git a/.ci/certs/ca_key.pem b/.ci/certs/ca_key.pem new file mode 100644 index 0000000..75349f7 --- /dev/null +++ b/.ci/certs/ca_key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQIm6kLjkvzznECAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECGjVddOBQ1QOBIIEyOAafHUtExxT +tY2ONQkUkXZG4/fIDvuNwt8IIkNUIGVp9WEDd4Mh5Ofa52uUKmlhj+FyRZ6u2mGT +VHU65e4kBYB10n0oybRPvRU1tFxgr8qI0T7Fqnx7WJAP3m0Bo/tWfqE0GHRrspZV +gABLVTOFvHE8oOsEh/ndMe+Y2qGaLsl+MF3jkfYAxSK2QwEK9HDa16Xsit7hqVbz +JUyvBmQVfTZzanIall+EpUntv/vlILKIlAFOZUXIZ/iL8LTQCmpycfGLknr4/9KP +gCYZmWFS18X9KVAwgV2kSdUebWH9phDosSw6fZh843l1SQvjG65PgrnWYb6Fw7B4 +s3Nk6bXjHYtvLT19EUrQOdeOegynaQQBs5WIcp9LbKT3LJVQpaVGV9thi+LPz1Bu +Lep583ayXTecA7Dbfa6S9R97TgRoMdDWaz1kTBReQTUhrL5736A38gpwJeBZDqel +39sRULCKARz2ZX0YpeZCmfVhVVSguO5gCfACsqHoOiTxYOA97GR128BcpEVJ1lst +sZZNwT3m6xIcXbS37EImhUMGiQ5fyGZ+8FIozTL9xNopIR97b3ceA9CoLc7EVcFC +RxHvh1HwtpyBDyopJp2wYu31nqcSDsJh+lmjo5R7bqvDDmflfkfu1G45JkXKr3Vz +M89S/y6Uo8W/EYT2MPYTsqcobtjx6oM1RYkVuYTR6cyUgQkHGtptkzGKxYE8dYwQ +4EIm87czYvCW0Mrp6yy0NGKzqBb+19Kuqc0HO+YezEQ8RjOVb8+D+cuCp2ZSItJ/ +S9m7BDTOzTS2lBotrFVkSbzaQafAmxQiaSP7gd0M9dnC0AOB2ILbyRAyIDQ0Y2dm +kMbiewQwNFiY9moRtgzHuHRfFZu4w996Q20cYZyMbxDfY17QoZQzfKWQH1BD7nq5 +G4RFpInt6q4q0F94nQWCif195VZF64+8ETMteJqtBFhUSQbq7PzKdpuf8NFxczLt +MDEWg2l6qNLP+zswulcVbFcC/HxAu4UtYf2m5MAtaurXZZ/+xPW5c/0caWMycQ7g +fbkYvC4j0OT7aqqMd1SYzEx7l75Vqn1sr2BsXZFoaqK2c/1LIb6U1kAhyhDQ46rV +0v6q4GUk4fdnE4N+9MXWBvlKSnqEVYlE54IuSUrYRuuBhO4LQpPMOAafQPR6QCTI +ikqWVmLAj50n7uba0Ao9lRKR7bFpdOQob/nYMTKT6YQaohYhbCv4zIK9fDgWWiXE +a2ecIP3KiZzw7oLMKXLcDt1RkkzE1FQxLfOeZ5EP4RwBGPDvR88ELO+lGQQt3VnS +FIZoXBUFUf7bEUzTwM4240zkjDYQPxD1j769Zq/JZfKyOEXXOJT8xHiwMg3ARWuE +hGlNKApbJGMn9myC61KaGMyCKRvMVxI25w3LfI4OAWt+67BB5OuAG11nmn9Kja73 +bhMFDIMZ8kE0p9IWfpiUJlDB9odGEc4z3Jl5CqBVDkMCDxq9BQDM0hSDk+ov8FO9 +g03PqMxvsxd2c56vkMtNY4hSGkYfN0RsM3vTXXLtPwRwRZURCmKK76BmsT4oBd+W +orqH4SABIAbYTwNOb7k/wOc4EfucawBqMG4g+29qewD67+EXjB0GadqOXRoQyhRq +hd74uUK5gzJOqStqiowQ0A== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/.ci/certs/client_localhost.p12 b/.ci/certs/client_localhost.p12 new file mode 100644 index 0000000000000000000000000000000000000000..1d688c585d57664312bd4c9ba4b8db8594f43c08 GIT binary patch literal 3651 zcmai%XD}QJ_r`bG)rsC)2tt$3E(4M7l}OATBTuLA(h<5U>7` z7a<5dy?-s7JOrNNAL-8$;7=3(*8xI|{u#i;5OoOmzatTZ5ro%RS>LZ#4d%eY!UI5v zaPa=`8Wuhn0Aa$xe~p9!Vb~xb4~TRZ;s zOm7yN(4T-}LiBf_lW&I>Aj*{9=UuC{TaT#pp&NYLons!UB-JbxIUDDIcUF|ArBpph z_MP8s9FTtG$+XQ|o|xM9bFP?9nt8?69R6UYMB_)x4(iQ}MR{V5_fiZ^AVp zrjS}&I*rf7-fax5Yh&Qg_W>f}Z`|e=`IWFF5ArfLm6CVRk*n5Q^oG2)K8jbvSayjq z8S(Bc2f=pg)~ygBk4aGe7VDAFgN?!OepnW56~x0C4B#gc$7BQJww;}WbJ@0^BaHLQ zwUAZAgE?Ab?SixKBxx8D#3kKAO+DdE8JifT%4KI#$Ir3G#^6jfqZ||J1TI}A(c}&N zjZ)e`_iFvl)+4$cQ-1}kEgfnB3%zP6(OwDFf|?vd_A{X_GAC7D@rQQw-`HTV#DG9s zlojdN*A0OsJFHVrrX(vlr|;ouhgP@8f$5V(#=~2vL2fdv$RGMp@<8L31-^|@b6o1>f>~*-4^p$1N}DN|U}iQsteZ5LW&lm~le;@svA& ziUl%vKW-2lYuNd2yTp0pzc8GOd-XQ!)vOySG5v-{D$Y;Um&OAwo+-+CRY|ShYy?K_ zdg>wW)L$)SiQu(cos3@HtY&$C5>kHuFrYYE@8cO!+m$M$iX@`9%j#RKT0r%%3Li?E zFPntUMznA!xg*c;z6T@taw7VRw%WtAYo%eHxd*aF@e;nDGzehZq}rN@vf1jhgfDRZ zlpr%_4rZ7!7*Cim9dlwx%T(+^zd7_~LevL3_}yGS3V(d;R(}YU#pU)m+~~g3=;qUG z+3&H}AM_$d&2U%JT9YwJSDVKB#TBRk=GP_(P(5vyUh!`Rmq0IDq(d~YTmZ7QStc^s zP_eIbMw7YK3ZmW=_M=i$IUNR=3KO6D0BU#3=ixUf%wwe(@0#t;Rnr}xUYnRWHHyvK z3#6L#EG5``K>)|scq&|KpEb>Ra^Ayktv`cp1cRA-ag2!Mc06rI%E==>*e~MS@VaMe&Nz<#glc$ZdeSDrLkA-ricT3IGD9^-_gRPFV=QcD3w@aR6X!fqUK$dUmo;c(o=P1J5u`w*{O-)qpp~ zKa+^|39Rzo?*Bin^y9@RgkU1B&1x<$@u>1xk>Kg_ueweBYgYcSax$u^Gk#_ zW%+=bAdbRJ4>{AWYNqgw$b%Mf6`O3om^*7;zw8EDiXrhed*!JnbQ`BuVdE1|nQ>IN zSc=3;Lmg6K&j@+HW{N{irK;)t?upFX8_Cqn1mU4lN~??o-Flzj$(KyR(p%h=Qwgfw zP(V3-5#iNw_!(#aKsw|~q3yCY_4nxb5WCNzZ0wI%@jfl(E3^H;%?YCgA^QAFTVD65 zzkFHpn;#yeT6Gke*)K-C52GDz1w$x}&`-T!Jb6f8laG)Z-q@*<29e=2sfnRO=m2{* zJozYzb`9@p`d8_=r#Hd#xsw$Aq#j<~$ca7T_M=Cv{Gm~ij6t-Cfu{g?2>f`yqB^TO zLDFlk1?qMiX)>qjfGaqZCv8Hyz_G|vBEpPOVlE;1JGM5^q;Ro6m9Kp4=PP?lj&GAm z+^%I3xG54G;Dz_sFT~GcNxC^KC>Yeh^5`dx9$yAl!^@(6ci=1W_&w#+xV}=I*~-jC z&Fd?xgyM@!>|Q%$5(phj&%9w{W6IUj_CHkSE{gvYGn!(Y*xGeu;@wFmoAR95rl7nj zmhh1duYee-L)$4Us)oj&+j-$wiw2j4KyE0Iqhs+-EpOAgzCKgV;Zn_VLr@QwVt?Wd zo^sPl$@d9Rc2UvoOrWI|TNP%&^<5Yp z(-NHJ#-QEZ*2YhG@Ugf*)_1VMnq{^5iF0tXllLV(@>$S!}%5r^>qS%ioHi1o)5{3G)HFTmKo zB&?s1LKFN?zzEm2l@+EoB>f`Pk*qsFZU{nvM(=v9<6`h%#4$`DZ(IP`xQBE>@M&7*@Zf4*xn#w6LZmH7HlB#XBB{Q8{1JV(4%$vl>Q(xNw#^Q4Q>Yg1_YDrR|LiJius zaWJ95$e<>SuuWPFfNeFB(#QgfbGlT3oQXI1!BsP%%(UHBclm_s=KV>@N=sG0_jNO7 zo!xCoG0lUg`RNx46w}+4PpRKbael{VCviF_eE(W_YHztM5GI@BeGZ)hIkcV-# z?;+(@G+fP+q5di~%6>JH`J$=MZzlP9MPYJ*!;;5^T#M#rCqqt@^Sue{*$Ne861+}L zJvzs<4TGYE>4u~?pV+6%Bom17?sRKZ_a8*TcK}JexZdaM6 zuATuurGDx}=C5`|PTKfjY){9hPiSJ*&;98fK;twY=RY?*nU6j+GzTseb}fWtf0J=s zKMM27g4G}!oH(pJA0XG+V@vauOS)mQR|liIu}-> zJ$G_S#6bQVNd{#Y>uU8FUkR5NgMO$p@&s%m$|1n3uu$3ewzopv&$p$}kvUrRbnFeh zxj;q3-%*De7e&Q4(1PT}VL=7K>`GO;Y@DaGIetSGuG_d^F1}< zWTZ2AY#c|2NnFaFRkACIZI6pURWRn_Mc7_H`?RCm{glNcEx{Q1BDl?V#!t4JzpkO^ z*<}r-28}Mw=OQW-i^1xm3;+vY6|6-qYW<4w#qp+EC37NF9vVs*zq(S(wx&lj&-Ylg z4dt(m#$dHZ6gH0=fK%Dm&)BuLtCTmhLeF(=YuUSA&EZPS=_Tnck8_mhQho2lJKR+* zygwqX7?az=_xu$L7spjOM$aDmR9HZGrO@Gs*iv;S-HX7*`WApc$~;2?7XzBQ$6w5W zCZy|D!VL3 z)|&Wmtul0{tXjG?(d*9P&k|V~#UOQRbsuI5C85X1wMJ*uC3c%i$Cc$XKhRw}{aPF} z@S9hfAvF2F6EaztgZphM9qR2q_r?Uy=s_+V9}D7J&!=fmFRDm`Ssf?8Gd%U{s_%6U zAcjlaot%1%2047@ix2#M9ygrsB_*c|h}n^<7*YH?|3?1YH4kcn6Y(tXnlX4|)cq@Y zjb(vX@UbApWH*6gXcl#_uScfpZoW$zlpw? zf#)nmKs+N-vPA~oxS27%RUC|X2=N(wI97vx+0zGO)KP%7%3xbKLlHVi zGUWmc8R2$^Ft&7treg`Cpjh){9#CElaXj{8+arlVFBaz%Kvwe?s^4?-(w7cNhQu#d zs-t_x{lqOoX?o3FeJZF^=!)>`( zk8M~4W>_lpZbiI$oU=CZE$5B&l_&%ZTE` zo9}zOGX?J6_)3iY6k!+oinfnwPG^}82u)ns_NJW9JN*`6x%54NqD+C-!YAkM*sjSq z#VJAw*52yGR+4G}6^$T;i7CoSjL-+w={QIQFXwF=yItPoQ^zaw^7dBKnf4RR$inxz zi9Sx8_PK|HsrQyOq|j>J9IJJTyMp2(P-~x)zPegnLwYc(q+Fujr9W%-`G>=%Ru%XH z>pgsL`e6C0Qg}WqW;PJ|XaMiIPH&e#8&e!aJ0QytFIE%%p$9#0_9*VYVp1RGEuDAF z1gQti2_A=Rsy^G%-*S!8%%K>vj|3C@IXr+rBI@27rKXh=oV40$f$vQfl|4ll8+M51 zPQ0;_*UVfBA$Zr>McUEjY708b%d_|R)snG~uoOpYR#5Cd%9gNB#-;*t+?uiMKPIt;@22fS}@9cFPuIb z_DLZuUxMZDoOJqwxC|!T7voDBA10UQ)^6k6oBCYeHQGR&tuEnxpXzx@Zit^;3^ynn z$-Vt_z_U;MZnp1ZGInp;pBP{v*s-j~oVamgF5B%QEWy1~!c_qx$+wu^*Dqbj*Q6%a&2yk%19h`VH&2c#^@14vgF`J=@vgF_vvS>}HJG6teiBu7Nc|1A zzt3Z3IGv=}ROJoSc}&`0;?=X{u2jJ*Io=Up;3aXSIe%Z9bGF5uefV{2^lU&}T8?6^RO^aOUY}&H3Y}Of?^yQbv zBkP`0KYE7&5uvwN24H-rX%}bj-gTu=426Bc8?V9&GIv(0*Qn*r`kG|>)PE_YsT+Mw zQ?hJ^x?;f|bzmYl$9)Ff)i^YZxZ8m#Fx;#O_ae>D!xP%ka2$WEbeOZ zb!ARxS7{?lk`B_Rbo8|NYL)?Tiyps{E1#puEc zb4_&`7#}6g_2u{Ptzi*YyDGkydb%y5QAU|mmgPyQ8&27(@y_G1cesBU!}Cnsal5vA zrCkCp_l9YlE4QbvJ+u$_(Uxu$$E!Cv!grl9Bs9NyiVzr3=s$e$U*G_F0x=->KeF?mav-Jre-@#l01*E%MgNFG z{|hkeJ&l1?@Mp;X2^f|~g74~sK4~$QMgTkAoC<&#;JC+E8)YnSYId~~Xm4|Yfy7e( zR!yh+(&=q_rfDnTWLU~dRWWDsX9)&?!rD)Kn_*fMsh(SQQ&Xu7HJI*L`s%BJGQomq zPSF%5kX45>^U~U{aFU5U@eo?7p~RCV7``V&hpYKpYfYZ(Sx(Tz5suij69xun4Qw5_ zoC3C9IW2!{+!pZY(!y0GTxZskrTqOY1+I~ezN>2DoFg1-N5T965-|J}^u|9}ML5Ha zA~;oF`7eiFlvUhiT>YKIDXxuZd;nP%tP(Q>hFZkjKK34}2i>2FoWct1LbCkM zu3)aU1J@`y?Rdh860m{ASsy$g8h=K9xR3Nl)`K8HY#M_0v)YnX-oI_?%UXztzhdlIo07L z%Z;yYQ(O}F3wnZVow{A6A8V9TZmRe$J*Wi>nq#3zbGrS;^ zAX)zA4{otp36ft`c~6i|!xh@31Vp!L7mfdZAO}O;-mn)>!ELN;AhuUx+D{h}Y((mk zQ-3DV3>~cZB+%Zen17lu=pW+5WII|`YlqKVY?^@n#Vybk)|I0YzUak+qP`W>>1w?$ za$!yD6nyRSy)nv7gli_Iu`IsdDn*E9{rLAgK1t3F4}sRL<`fR&nSO$M0e=Px-RZYC z-}p01HnDiq{>c!%k++LvQ`AvQdv?I%-Ql;gp+}-L;J+Pn?tUEBh^l@_sYABL_G-{i z+8#Zpt|Ro@US?u=n~_IsC`*wiRMXzIYCCh;NtOH~*pXwNv-U4Gx!grZ)4T1d%)i#P z;u?S8Xz72I_R5KeZbu`#9a6*^LEuoTB6D&cWESj02+0#SsDn{j21@Plby9)Uw7o&U zrKN0Yc~f5!a19fb&FOHNBr+ZPbl$SqWe zwdMQH77)[^<]+") + +# GHA does shallow clones, so need the next 2 commands to have the gh-pages branch +git remote set-branches origin 'gh-pages' +git fetch -v + +git checkout gh-pages +mkdir -p $RELEASE_VERSION/htmlsingle +cp target/generated-docs/index.html $RELEASE_VERSION/htmlsingle +mkdir -p $RELEASE_VERSION/api +cp -r target/site/apidocs/* $RELEASE_VERSION/api/ +git add $RELEASE_VERSION/ + +if [[ $LATEST == "true" ]] + then + if [[ $RELEASE_VERSION == *[RCM]* ]] + then + DOC_DIR="milestone" + elif [[ $RELEASE_VERSION == *SNAPSHOT* ]] + then + DOC_DIR="snapshot" + else + DOC_DIR="stable" + fi + + mkdir -p $DOC_DIR/htmlsingle + cp target/generated-docs/index.html $DOC_DIR/htmlsingle + mkdir -p $DOC_DIR/api + cp -r target/site/apidocs/* $DOC_DIR/api/ + git add $DOC_DIR/ + +fi + +git commit -m "$MESSAGE" +git push origin gh-pages +git checkout main diff --git a/.ci/ubuntu/enabled_plugins b/.ci/ubuntu/enabled_plugins new file mode 100644 index 0000000..2e81f16 --- /dev/null +++ b/.ci/ubuntu/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_auth_mechanism_ssl,rabbitmq_management,rabbitmq_stream,rabbitmq_stream_management,rabbitmq_top]. diff --git a/.ci/ubuntu/gha-log-check.sh b/.ci/ubuntu/gha-log-check.sh new file mode 100755 index 0000000..fef23a8 --- /dev/null +++ b/.ci/ubuntu/gha-log-check.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -o errexit +set -o pipefail +set -o xtrace +set -o nounset + +readonly docker_name_prefix='rabbitmq-amqp-go-client' + +declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq" + +if docker logs "$rabbitmq_docker_name" | grep -iF inet_error +then + echo '[ERROR] found inet_error in RabbitMQ logs' 1>&2 + exit 1 +fi diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh new file mode 100755 index 0000000..646f930 --- /dev/null +++ b/.ci/ubuntu/gha-setup.sh @@ -0,0 +1,180 @@ +#!/usr/bin/env bash + +set -o errexit +set -o pipefail +set -o xtrace + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +readonly script_dir +echo "[INFO] script_dir: '$script_dir'" + +if [[ $3 == 'arm' ]] +then + readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}" +else + readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}" +fi + + +readonly docker_name_prefix='rabbitmq-amqp-go-client' +readonly docker_network_name="$docker_name_prefix-network" + +if [[ ! -v GITHUB_ACTIONS ]] +then + GITHUB_ACTIONS='false' +fi + +if [[ -d $GITHUB_WORKSPACE ]] +then + echo "[INFO] GITHUB_WORKSPACE is set: '$GITHUB_WORKSPACE'" +else + GITHUB_WORKSPACE="$(cd "$script_dir/../.." && pwd)" + echo "[INFO] set GITHUB_WORKSPACE to: '$GITHUB_WORKSPACE'" +fi + +if [[ $1 == 'toxiproxy' ]] +then + readonly run_toxiproxy='true' +else + readonly run_toxiproxy='false' +fi + +if [[ $2 == 'pull' ]] +then + readonly docker_pull_args='--pull always' +else + readonly docker_pull_args='' +fi + +if [[ $1 == 'stop' ]] +then + docker stop "$rabbitmq_docker_name" + docker stop "$toxiproxy_docker_name" + exit 0 +fi + +set -o nounset + +declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq" +declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy" + +function start_toxiproxy +{ + if [[ $run_toxiproxy == 'true' ]] + then + # sudo ss -4nlp + echo "[INFO] starting Toxiproxy server docker container" + docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running" + # shellcheck disable=SC2086 + docker run --detach $docker_pull_args \ + --name "$toxiproxy_docker_name" \ + --hostname "$toxiproxy_docker_name" \ + --publish 8474:8474 \ + --publish 55670-55680:55670-55680 \ + --network "$docker_network_name" \ + 'ghcr.io/shopify/toxiproxy:latest' + fi +} + +function start_rabbitmq +{ + echo "[INFO] starting RabbitMQ server docker container" + chmod 0777 "$GITHUB_WORKSPACE/.ci/ubuntu/log" + docker rm --force "$rabbitmq_docker_name" 2>/dev/null || echo "[INFO] $rabbitmq_docker_name was not running" + # shellcheck disable=SC2086 + docker run --detach $docker_pull_args \ + --name "$rabbitmq_docker_name" \ + --hostname "$rabbitmq_docker_name" \ + --publish 5671:5671 \ + --publish 5672:5672 \ + --publish 15672:15672 \ + --network "$docker_network_name" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/enabled_plugins:/etc/rabbitmq/enabled_plugins" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" \ + --volume "$GITHUB_WORKSPACE/.ci/certs:/etc/rabbitmq/certs:ro" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/log:/var/log/rabbitmq" \ + "$rabbitmq_image" +} + +function wait_rabbitmq +{ + set +o errexit + set +o xtrace + + declare -i count=12 + while (( count > 0 )) && [[ "$(docker inspect --format='{{.State.Running}}' "$rabbitmq_docker_name")" != 'true' ]] + do + echo '[WARNING] RabbitMQ container is not yet running...' + sleep 5 + (( count-- )) + done + + declare -i count=12 + while (( count > 0 )) && ! docker exec "$rabbitmq_docker_name" epmd -names | grep -F 'name rabbit' + do + echo '[WARNING] epmd is not reporting rabbit name just yet...' + sleep 5 + (( count-- )) + done + + set -o xtrace + + docker exec "$rabbitmq_docker_name" rabbitmqctl await_startup + docker exec "$rabbitmq_docker_name" rabbitmq-diagnostics erlang_version + docker exec "$rabbitmq_docker_name" rabbitmqctl version + + set -o errexit +} + +function get_rabbitmq_id +{ + local rabbitmq_docker_id + rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")" + echo "[INFO] '$rabbitmq_docker_name' docker id is '$rabbitmq_docker_id'" + if [[ -v GITHUB_OUTPUT ]] + then + if [[ -f $GITHUB_OUTPUT ]] + then + echo "[INFO] GITHUB_OUTPUT file: '$GITHUB_OUTPUT'" + fi + echo "id=$rabbitmq_docker_id" >> "$GITHUB_OUTPUT" + fi +} + +function install_ca_certificate +{ + set +o errexit + hostname + hostname -s + hostname -f + openssl version + openssl version -d + set -o errexit + + if [[ $GITHUB_ACTIONS == 'true' ]] + then + readonly openssl_store_dir='/usr/lib/ssl/certs' + sudo cp -vf "$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem" "$openssl_store_dir" + sudo ln -vsf "$openssl_store_dir/ca_certificate.pem" "$openssl_store_dir/$(openssl x509 -hash -noout -in $openssl_store_dir/ca_certificate.pem).0" + else + echo "[WARNING] you must install '$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem' manually into your trusted root store" + fi + + openssl s_client -connect localhost:5671 \ + -CAfile "$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem" \ + -cert "$GITHUB_WORKSPACE/.ci/certs/client_localhost_certificate.pem" \ + -key "$GITHUB_WORKSPACE/.ci/certs/client_localhost_key.pem" \ + -pass pass:grapefruit < /dev/null +} + +docker network create "$docker_network_name" || echo "[INFO] network '$docker_network_name' is already created" + +start_toxiproxy + +start_rabbitmq + +wait_rabbitmq + +get_rabbitmq_id + +install_ca_certificate diff --git a/.ci/ubuntu/log/.gitkeep b/.ci/ubuntu/log/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/.ci/ubuntu/rabbitmq.conf b/.ci/ubuntu/rabbitmq.conf new file mode 100644 index 0000000..829dcec --- /dev/null +++ b/.ci/ubuntu/rabbitmq.conf @@ -0,0 +1,26 @@ +loopback_users = none +loopback_users.guest = true + +log.console = true +log.console.level = debug +log.file = /var/log/rabbitmq/rabbitmq.log +log.file.level = debug +log.exchange = false + +listeners.tcp.default = 5672 +listeners.ssl.default = 5671 +reverse_dns_lookups = false + +deprecated_features.permit.amqp_address_v1 = false + +ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem +ssl_options.certfile = /etc/rabbitmq/certs/server_localhost_certificate.pem +ssl_options.keyfile = /etc/rabbitmq/certs/server_localhost_key.pem +ssl_options.verify = verify_peer +ssl_options.password = grapefruit +ssl_options.depth = 1 +ssl_options.fail_if_no_peer_cert = false + +auth_mechanisms.1 = PLAIN +auth_mechanisms.2 = ANONYMOUS +auth_mechanisms.3 = EXTERNAL diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml new file mode 100644 index 0000000..7284091 --- /dev/null +++ b/.github/workflows/build-test.yaml @@ -0,0 +1,33 @@ +name: Test against supported go-version + +on: + - workflow_call + +jobs: + build-ubuntu: + runs-on: ubuntu-latest + strategy: + fail-fast: true + matrix: + go: [ '1.22'] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + id: setup_go + with: + go-version: ${{ matrix.go }} + check-latest: true + - name: Start RabbitMQ + id: start-rabbitmq + run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh + - name: Test + timeout-minutes: 15 + run: make test + - name: Check for errors in RabbitMQ logs + run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh + - name: Maybe upload RabbitMQ logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: rabbitmq-logs-integration-ubuntu + path: ${{ github.workspace }}/.ci/ubuntu/log/ diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..5289b53 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,11 @@ +name: rabbitmq-amqp-go-client + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + call-build-test: + uses: ./.github/workflows/build-test.yaml \ No newline at end of file diff --git a/.github/workflows/publish-nuget.yaml b/.github/workflows/publish-nuget.yaml new file mode 100644 index 0000000..1fea9db --- /dev/null +++ b/.github/workflows/publish-nuget.yaml @@ -0,0 +1,27 @@ +name: publish-nuget + +on: + workflow_call: + secrets: + NUGET_API_KEY: + required: true + +jobs: + publish-nuget: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: true + - uses: actions/cache@v4 + with: + path: | + ~/.nuget/packages + ~/.local/share/NuGet/v3-cache + key: ${{ runner.os }}-v0-nuget-${{ hashFiles('**/*.csproj') }} + restore-keys: | + ${{ runner.os }}-v0-nuget- + - name: Build (Release) + run: dotnet build ${{ github.workspace }}/Build.csproj --configuration=Release --property CI=true + - name: Publish to NuGet + run: dotnet nuget push --skip-duplicate --api-key ${{ secrets.NUGET_API_KEY }} --source 'https://api.nuget.org/v3/index.json' ${{ github.workspace }}/packages/RabbitMQ.AMQP.Client.*.nupkg diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml new file mode 100644 index 0000000..c01e545 --- /dev/null +++ b/.github/workflows/publish.yaml @@ -0,0 +1,15 @@ +name: publish rabbitmq-dotnet-client + +on: + release: + types: + # https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#release + - published + +jobs: + call-build-test: + uses: ./.github/workflows/build-test.yaml + call-publish-nuget: + uses: ./.github/workflows/publish-nuget.yaml + needs: call-build-test + secrets: inherit diff --git a/.gitignore b/.gitignore index 6f72f89..eecfdeb 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,7 @@ go.work.sum # env file .env +.idea/ +coverage.txt +.DS_Store +.ci/ubuntu/log/rabbitmq.log diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d655c81 --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +all: format vet test + +format: + go fmt ./... + +vet: + go vet ./rabbitmq_amqp + +test: + cd rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \ + --randomize-all --randomize-suites \ + --cover --coverprofile=coverage.txt --covermode=atomic \ + --race + + + +rabbitmq-server-start-arm: + ./.ci/ubuntu/gha-setup.sh start pull arm + +rabbitmq-server-stop: + ./.ci/ubuntu/gha-setup.sh stop diff --git a/README.md b/README.md index f90ce17..9cea4e6 100644 --- a/README.md +++ b/README.md @@ -1 +1,13 @@ -# rabbitmq-amqp-go-client \ No newline at end of file +# RabbitMQ AMQP 1.0 .Golang Client + +This library is in early stages of development. It is meant to be used with RabbitMQ 4.0. + +## How to Run + +- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker. +- `make test` to run the tests +- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop` + +## Getting Started + +You can find an example in: `examples/getting_started` diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go new file mode 100644 index 0000000..b7d74df --- /dev/null +++ b/examples/getting_started/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "bufio" + "context" + "fmt" + mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" + "os" +) + +func main() { + fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n") + + chStatusChanged := make(chan *mq.StatusChanged, 1) + + go func(ch chan *mq.StatusChanged) { + for statusChanged := range ch { + fmt.Printf("Status changed from %d to %d\n", statusChanged.From, statusChanged.To) + } + }(chStatusChanged) + + amqpConnection := mq.NewAmqpConnection() + amqpConnection.NotifyStatusChange(chStatusChanged) + err := amqpConnection.Open(context.Background(), mq.NewConnectionSettings()) + if err != nil { + return + } + + fmt.Printf("AMQP Connection opened.\n") + management := amqpConnection.Management() + queueSpec := management.Queue("getting_started_queue"). + QueueType(mq.QueueType{Type: mq.Quorum}). + MaxLengthBytes(mq.CapacityGB(1)). + DeadLetterExchange("dead-letter-exchange"). + DeadLetterRoutingKey("dead-letter-routing-key") + queueInfo, err := queueSpec.Declare(context.Background()) + if err != nil { + return + } + fmt.Printf("Queue %s created.\n", queueInfo.GetName()) + err = queueSpec.Delete(context.Background()) + if err != nil { + return + } + fmt.Printf("Queue %s deleted.\n", queueInfo.GetName()) + + fmt.Println("Press any key to stop ") + reader := bufio.NewReader(os.Stdin) + _, _ = reader.ReadString('\n') + + err = amqpConnection.Close(context.Background()) + if err != nil { + return + } + fmt.Printf("AMQP Connection closed.\n") + close(chStatusChanged) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..135f6cf --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module github.com/rabbitmq/rabbitmq-amqp-go-client + +go 1.22.0 + +require ( + github.com/Azure/go-amqp v0.0.0-00010101000000-000000000000 + github.com/onsi/ginkgo/v2 v2.20.2 + github.com/onsi/gomega v1.34.2 +) + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect + github.com/google/uuid v1.6.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + golang.org/x/tools v0.24.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/Azure/go-amqp => github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..803082e --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48 h1:etxEtd7qkhJD34gpQesPbZuMJrqkc+ZOXqR3diVfGWs= +github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSFBy+X1V0o+l+8NF1avt4HWl7cA= +github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= +github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= +github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= +github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go new file mode 100644 index 0000000..2cf2907 --- /dev/null +++ b/rabbitmq_amqp/amqp_connection.go @@ -0,0 +1,179 @@ +package rabbitmq_amqp + +import ( + "context" + "crypto/tls" + "fmt" + "github.com/Azure/go-amqp" +) + +type ConnectionSettings struct { + host string + port int + user string + password string + virtualHost string + scheme string + containerId string + useSsl bool + tlsConfig *tls.Config +} + +func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings { + c.tlsConfig = config + return c +} + +func (c *ConnectionSettings) GetTlsConfig() *tls.Config { + return c.tlsConfig +} + +func (c *ConnectionSettings) Port(port int) IConnectionSettings { + c.port = port + return c +} + +func (c *ConnectionSettings) User(userName string) IConnectionSettings { + + c.user = userName + return c +} + +func (c *ConnectionSettings) Password(password string) IConnectionSettings { + c.password = password + return c +} + +func (c *ConnectionSettings) VirtualHost(virtualHost string) IConnectionSettings { + c.virtualHost = virtualHost + return c +} + +func (c *ConnectionSettings) ContainerId(containerId string) IConnectionSettings { + c.containerId = containerId + return c +} + +func (c *ConnectionSettings) GetHost() string { + return c.host +} + +func (c *ConnectionSettings) Host(hostName string) IConnectionSettings { + c.host = hostName + return c + +} + +func (c *ConnectionSettings) GetPort() int { + return c.port +} + +func (c *ConnectionSettings) GetUser() string { + return c.user +} + +func (c *ConnectionSettings) GetPassword() string { + return c.password +} + +func (c *ConnectionSettings) GetVirtualHost() string { + return c.virtualHost +} + +func (c *ConnectionSettings) GetScheme() string { + return c.scheme +} + +func (c *ConnectionSettings) GetContainerId() string { + return c.containerId +} + +func (c *ConnectionSettings) UseSsl(value bool) IConnectionSettings { + c.useSsl = value + if value { + c.scheme = "amqps" + } else { + c.scheme = "amqp" + } + return c +} + +func (c *ConnectionSettings) IsSsl() bool { + return c.useSsl +} + +func (c *ConnectionSettings) BuildAddress() string { + return c.scheme + "://" + c.host + ":" + fmt.Sprint(c.port) +} + +func NewConnectionSettings() IConnectionSettings { + return &ConnectionSettings{ + host: "localhost", + port: 5672, + user: "guest", + password: "guest", + virtualHost: "/", + scheme: "amqp", + containerId: "amqp-go-client", + useSsl: false, + tlsConfig: nil, + } +} + +type AmqpConnection struct { + Connection *amqp.Conn + management IManagement + lifeCycle *LifeCycle +} + +func (a *AmqpConnection) Management() IManagement { + return a.management +} + +func NewAmqpConnection() IConnection { + return &AmqpConnection{ + management: NewAmqpManagement(), + lifeCycle: NewLifeCycle(), + } +} + +func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error { + // TODO: add support for other SASL types + sASLType := amqp.SASLTypeAnonymous() + + conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{ + ContainerID: connectionSettings.GetContainerId(), + SASLType: sASLType, + HostName: connectionSettings.GetVirtualHost(), + TLSConfig: connectionSettings.GetTlsConfig(), + }) + if err != nil { + return err + } + a.Connection = conn + a.lifeCycle.SetStatus(Open) + + err = a.Management().Open(ctx, a) + if err != nil { + return err + } + return nil +} + +func (a *AmqpConnection) Close(ctx context.Context) error { + err := a.Management().Close(ctx) + if err != nil { + return err + } + err = a.Connection.Close() + a.lifeCycle.SetStatus(Closed) + return err +} + +func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) { + a.lifeCycle.chStatusChanged = channel +} + +func (a *AmqpConnection) GetStatus() int { + return a.lifeCycle.Status() +} diff --git a/rabbitmq_amqp/amqp_connection_test.go b/rabbitmq_amqp/amqp_connection_test.go new file mode 100644 index 0000000..34b2d7e --- /dev/null +++ b/rabbitmq_amqp/amqp_connection_test.go @@ -0,0 +1,96 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "time" +) + +var _ = Describe("AMQP Connection Test", func() { + It("AMQP Connection should success", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + }) + + It("AMQP Connection should fail due of wrong port", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + connectionSettings.Host("localhost").Port(1234) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Connection should fail due of wrong host", func() { + + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + connectionSettings.Host("wronghost").Port(5672) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Connection should fail due of context cancelled", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + cancel() + err := amqpConnection.Open(ctx, NewConnectionSettings()) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Connection should receive events ", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + ch := make(chan *StatusChanged, 1) + amqpConnection.NotifyStatusChange(ch) + err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) + Expect(err).To(BeNil()) + recv := <-ch + Expect(recv).NotTo(BeNil()) + Expect(recv.From).To(Equal(Closed)) + Expect(recv.To).To(Equal(Open)) + + err = amqpConnection.Close(context.Background()) + Expect(err).To(BeNil()) + recv = <-ch + Expect(recv).NotTo(BeNil()) + + Expect(recv.From).To(Equal(Open)) + Expect(recv.To).To(Equal(Closed)) + + }) + + //It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() { + // amqpConnection := NewAmqpConnection() + // Expect(amqpConnection).NotTo(BeNil()) + // Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + // + // connectionSettings := NewConnectionSettings(). + // UseSsl(true).Port(5671).TlsConfig(&tls.Config{ + // //ServerName: "localhost", + // InsecureSkipVerify: true, + // }) + // Expect(connectionSettings).NotTo(BeNil()) + // Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + // err := amqpConnection.Open(context.TODO(), connectionSettings) + // Expect(err).To(BeNil()) + //}) + +}) diff --git a/rabbitmq_amqp/amqp_managent.go b/rabbitmq_amqp/amqp_managent.go new file mode 100644 index 0000000..2994191 --- /dev/null +++ b/rabbitmq_amqp/amqp_managent.go @@ -0,0 +1,222 @@ +package rabbitmq_amqp + +import ( + "context" + "errors" + "fmt" + "github.com/Azure/go-amqp" + "github.com/google/uuid" + "strconv" + "time" +) + +var PreconditionFailed = errors.New("precondition Failed") + +type AmqpManagement struct { + session *amqp.Session + sender *amqp.Sender + receiver *amqp.Receiver + lifeCycle *LifeCycle + cancel context.CancelFunc +} + +func NewAmqpManagement() *AmqpManagement { + return &AmqpManagement{ + lifeCycle: NewLifeCycle(), + } + +} + +func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { + if a.receiver == nil { + prop := make(map[string]any) + prop["paired"] = true + opts := &amqp.ReceiverOptions{ + DynamicAddress: false, + Name: linkPairName, + Properties: prop, + RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(), + SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), + TargetAddress: managementNodeAddress, + ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, + Credit: 100, + } + receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts) + if err != nil { + return err + } + a.receiver = receiver + return nil + } + return nil +} + +//func (a *AmqpManagement) processMessages(ctx context.Context) error { +// +// go func() { +// +// for a.GetStatus() == Open { +// msg, err := a.receiver.Receive(ctx, nil) // blocking call +// if err != nil { +// fmt.Printf("Exiting processMessages %s\n", err) +// return +// } +// +// if msg != nil { +// a.receiver.AcceptMessage(ctx, msg) +// } +// } +// +// fmt.Printf("Exiting processMessages\n") +// }() + +//return nil +//} + +func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error { + if a.sender == nil { + prop := make(map[string]any) + prop["paired"] = true + opts := &amqp.SenderOptions{ + DynamicAddress: false, + ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, + ExpiryTimeout: 0, + Name: linkPairName, + Properties: prop, + SettlementMode: amqp.SenderSettleModeSettled.Ptr(), + RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(), + SourceAddress: managementNodeAddress, + } + sender, err := a.session.NewSender(ctx, managementNodeAddress, opts) + if err != nil { + return err + } + + a.sender = sender + return nil + } + return nil +} + +func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error { + session, err := connection.(*AmqpConnection).Connection.NewSession(ctx, nil) + if err != nil { + return err + } + a.session = session + err = a.ensureSenderLink(ctx) + + if err != nil { + return err + } + + time.Sleep(500 * time.Millisecond) + err = a.ensureReceiverLink(ctx) + time.Sleep(500 * time.Millisecond) + if err != nil { + return err + } + //if ctx.Err() != nil { + // // start processing messages. Here we pass a context that will be closed + // // when the receiver session is closed. + // // we won't expose To the user since the user will call Close + // // and the processing _must_ be running in the background + // // for the management session life. + // //err = a.processMessages(context.Background()) + // //if err != nil { + // // return err + // //} + //} + a.lifeCycle.SetStatus(Open) + return ctx.Err() +} + +func (a *AmqpManagement) Close(ctx context.Context) error { + _ = a.sender.Close(ctx) + _ = a.receiver.Close(ctx) + err := a.session.Close(ctx) + a.lifeCycle.SetStatus(Closed) + return err +} + +func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, + expectedResponseCodes []int) (map[string]any, error) { + + return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes) + +} + +func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error { + + for _, code := range expectedResponseCodes { + if code == responseCode { + return nil + } + } + return PreconditionFailed +} + +func (a *AmqpManagement) request(ctx context.Context, id string, body any, path string, method string, + expectedResponseCodes []int) (map[string]any, error) { + amqpMessage := amqp.NewMessageWithValue(body) + s := commandReplyTo + amqpMessage.Properties = &amqp.MessageProperties{ + ReplyTo: &s, + To: &path, + Subject: &method, + MessageID: &id, + } + opts := &amqp.SendOptions{Settled: true} + err := a.sender.Send(ctx, amqpMessage, opts) + if err != nil { + return make(map[string]any), err + } + msg, err := a.receiver.Receive(ctx, nil) + if err != nil { + return make(map[string]any), err + } + err = a.receiver.AcceptMessage(ctx, msg) + if err != nil { + return nil, err + } + if msg.Properties == nil { + return make(map[string]any), fmt.Errorf("expected properties in the message") + } + + if msg.Properties.CorrelationID == nil { + return make(map[string]any), fmt.Errorf("expected correlation id in the message") + } + + if msg.Properties.CorrelationID != id { + return make(map[string]any), fmt.Errorf("expected correlation id %s got %s", id, msg.Properties.CorrelationID) + } + switch msg.Value.(type) { + case map[string]interface{}: + return msg.Value.(map[string]any), nil + } + + i, _ := strconv.Atoi(*msg.Properties.Subject) + + err = a.validateResponseCode(i, expectedResponseCodes) + if err != nil { + return nil, err + } + + return make(map[string]any), nil +} + +func (a *AmqpManagement) Queue(queueName string) IQueueSpecification { + return newAmqpQueue(a, queueName) +} + +func (a *AmqpManagement) QueueClientName() IQueueSpecification { + return newAmqpQueue(a, "") +} + +func (a *AmqpManagement) NotifyStatusChange(channel chan *StatusChanged) { + a.lifeCycle.chStatusChanged = channel +} + +func (a *AmqpManagement) GetStatus() int { + return a.lifeCycle.Status() +} diff --git a/rabbitmq_amqp/amqp_managent_test.go b/rabbitmq_amqp/amqp_managent_test.go new file mode 100644 index 0000000..d2b7521 --- /dev/null +++ b/rabbitmq_amqp/amqp_managent_test.go @@ -0,0 +1,70 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "time" +) + +var _ = Describe("Management tests", func() { + + It("AMQP Management should fail due of context cancelled", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + err := amqpConnection.Open(context.Background(), NewConnectionSettings()) + Expect(err).To(BeNil()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + cancel() + err = amqpConnection.Management().Open(ctx, amqpConnection) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Management should receive events ", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + ch := make(chan *StatusChanged, 1) + amqpConnection.Management().NotifyStatusChange(ch) + err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) + Expect(err).To(BeNil()) + recv := <-ch + Expect(recv).NotTo(BeNil()) + Expect(recv.From).To(Equal(Closed)) + Expect(recv.To).To(Equal(Open)) + + err = amqpConnection.Close(context.Background()) + Expect(err).To(BeNil()) + recv = <-ch + Expect(recv).NotTo(BeNil()) + + Expect(recv.From).To(Equal(Open)) + Expect(recv.To).To(Equal(Closed)) + + }) + + It("Request", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + + management := amqpConnection.Management() + kv := make(map[string]any) + kv["durable"] = true + kv["auto_delete"] = false + _queueArguments := make(map[string]any) + _queueArguments["x-queue-type"] = "quorum" + kv["arguments"] = _queueArguments + path := "/queues/test" + result, err := management.Request(context.TODO(), kv, path, "PUT", []int{200}) + Expect(err).To(BeNil()) + Expect(result).NotTo(BeNil()) + Expect(management.Close(context.TODO())).To(BeNil()) + }) +}) diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go new file mode 100644 index 0000000..9199347 --- /dev/null +++ b/rabbitmq_amqp/amqp_queue.go @@ -0,0 +1,178 @@ +package rabbitmq_amqp + +import ( + "context" +) + +type AmqpQueueInfo struct { + name string + isDurable bool + isAutoDelete bool + isExclusive bool + leader string + replicas []string + arguments map[string]any + queueType TQueueType +} + +func (a *AmqpQueueInfo) GetLeader() string { + return a.leader +} + +func (a *AmqpQueueInfo) GetReplicas() []string { + return a.replicas +} + +func newAmqpQueueInfo(response map[string]any) IQueueInfo { + return &AmqpQueueInfo{ + name: response["name"].(string), + isDurable: response["durable"].(bool), + isAutoDelete: response["auto_delete"].(bool), + isExclusive: response["exclusive"].(bool), + queueType: TQueueType(response["type"].(string)), + leader: response["leader"].(string), + replicas: response["replicas"].([]string), + arguments: response["arguments"].(map[string]any), + } +} + +func (a *AmqpQueueInfo) IsDurable() bool { + return a.isDurable +} + +func (a *AmqpQueueInfo) IsAutoDelete() bool { + return a.isAutoDelete +} + +func (a *AmqpQueueInfo) Exclusive() bool { + return a.isExclusive +} + +func (a *AmqpQueueInfo) Type() TQueueType { + return a.queueType +} + +func (a *AmqpQueueInfo) GetName() string { + return a.name +} + +func (a *AmqpQueueInfo) GetArguments() map[string]any { + return a.arguments +} + +type AmqpQueue struct { + management *AmqpManagement + queueArguments map[string]any + isExclusive bool + isAutoDelete bool + name string +} + +func (a *AmqpQueue) DeadLetterExchange(dlx string) IQueueSpecification { + a.queueArguments["x-dead-letter-exchange"] = dlx + return a +} + +func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) IQueueSpecification { + a.queueArguments["x-dead-letter-routing-key"] = dlrk + return a +} + +func (a *AmqpQueue) MaxLengthBytes(length int64) IQueueSpecification { + a.queueArguments["max-length-bytes"] = length + return a +} + +func (a *AmqpQueue) QueueType(queueType QueueType) IQueueSpecification { + a.queueArguments["x-queue-type"] = queueType.String() + return a +} + +func (a *AmqpQueue) GetQueueType() TQueueType { + if a.queueArguments["x-queue-type"] == nil { + return Classic + } + return TQueueType(a.queueArguments["x-queue-type"].(string)) +} + +func (a *AmqpQueue) Exclusive(isExclusive bool) IQueueSpecification { + a.isExclusive = isExclusive + return a +} + +func (a *AmqpQueue) IsExclusive() bool { + return a.isExclusive +} + +func (a *AmqpQueue) AutoDelete(isAutoDelete bool) IQueueSpecification { + a.isAutoDelete = isAutoDelete + return a +} + +func (a *AmqpQueue) IsAutoDelete() bool { + return a.isAutoDelete +} + +func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecification { + return &AmqpQueue{management: management, + name: queueName, + queueArguments: make(map[string]any)} +} + +func (a *AmqpQueue) validate() error { + + if a.queueArguments["max-length-bytes"] != nil { + + err := validatePositive("max length", a.queueArguments["max-length-bytes"].(int64)) + if err != nil { + return err + } + } + return nil +} + +func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { + + if Quorum == a.GetQueueType() || + Stream == a.GetQueueType() { + // mandatory arguments for quorum queues and streams + a.Exclusive(false).AutoDelete(false) + } + if err := a.validate(); err != nil { + return nil, err + } + + if a.name == "" { + a.name = GenerateNameWithDefaultPrefix() + } + + path := queuePath(a.name) + kv := make(map[string]any) + kv["durable"] = true + kv["auto_delete"] = a.isAutoDelete + kv["exclusive"] = a.isExclusive + kv["arguments"] = a.queueArguments + response, err := a.management.Request(ctx, kv, path, commandPut, []int{200}) + if err != nil { + return nil, err + } + return newAmqpQueueInfo(response), nil +} + +func (a *AmqpQueue) Delete(ctx context.Context) error { + path := queuePath(a.name) + _, err := a.management.Request(ctx, nil, path, commandDelete, []int{200}) + if err != nil { + return err + } + return nil +} + +func (a *AmqpQueue) Name(queueName string) IQueueSpecification { + a.name = queueName + return a +} + +func (a *AmqpQueue) GetName() string { + return a.name +} diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go new file mode 100644 index 0000000..5e2d79c --- /dev/null +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -0,0 +1,153 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("AMQP Queue test ", func() { + + var connection IConnection + var management IManagement + BeforeEach(func() { + connection = NewAmqpConnection() + Expect(connection).NotTo(BeNil()) + Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{})) + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := connection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + management = connection.Management() + + }) + + AfterEach(func() { + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Queue Declare With Response and Delete should success ", func() { + const queueName = "AMQP Queue Declare With Response and Delete should success" + queueSpec := management.Queue(queueName) + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeFalse()) + Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.Type()).To(Equal(Classic)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Queue Declare With Parameters and Delete should success ", func() { + const queueName = "AMQP Queue Declare With Parameters and Delete should success" + queueSpec := management.Queue(queueName).Exclusive(true). + AutoDelete(true). + QueueType(QueueType{Classic}). + MaxLengthBytes(CapacityGB(1)). + DeadLetterExchange("dead-letter-exchange"). + DeadLetterRoutingKey("dead-letter-routing-key") + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeTrue()) + Expect(queueInfo.Exclusive()).To(BeTrue()) + Expect(queueInfo.Type()).To(Equal(Classic)) + Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit")) + Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0)) + + Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-exchange", "dead-letter-exchange")) + Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key")) + Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000))) + + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Quorum Queue and Delete should success ", func() { + const queueName = "AMQP Declare Quorum Queue and Delete should success" + // Quorum queue will ignore Exclusive and AutoDelete settings + // since they are not supported by quorum queues + queueSpec := management.Queue(queueName). + Exclusive(true). + AutoDelete(true).QueueType(QueueType{Quorum}) + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeFalse()) + Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.Type()).To(Equal(Quorum)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Stream Queue and Delete should success ", func() { + const queueName = "AMQP Declare Stream Queue and Delete should success" + // Stream queue will ignore Exclusive and AutoDelete settings + // since they are not supported by quorum queues + queueSpec := management.Queue(queueName). + Exclusive(true). + AutoDelete(true).QueueType(QueueType{Stream}) + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.IsDurable()).To(BeTrue()) + Expect(queueInfo.IsAutoDelete()).To(BeFalse()) + Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.Type()).To(Equal(Stream)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Queue with invalid type should fail ", func() { + const queueName = "AMQP Declare Queue with invalid type should fail" + queueSpec := management.Queue(queueName). + QueueType(QueueType{Type: "invalid"}) + _, err := queueSpec.Declare(context.TODO()) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Declare Queue should fail with Precondition fail ", func() { + + // The first queue is declared as Classic and it should succeed + // The second queue is declared as Quorum and it should fail since it is already declared as Classic + const queueName = "AMQP Declare Queue should fail with Precondition fail" + queueSpec := management.Queue(queueName).QueueType(QueueType{Classic}) + _, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum}) + _, err = queueSpecFail.Declare(context.TODO()) + Expect(err).NotTo(BeNil()) + Expect(err).To(Equal(PreconditionFailed)) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Declare Queue should fail during validation", func() { + + const queueName = "AMQP Declare Queue should fail during validation" + queueSpec := management.Queue(queueName).MaxLengthBytes(-1) + _, err := queueSpec.Declare(context.TODO()) + Expect(err).NotTo(BeNil()) + Expect(err).To(HaveOccurred()) + }) + + It("AMQP Declare Queue should create client name queue", func() { + queueSpec := management.QueueClientName() + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(ContainSubstring("client.gen-")) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + +}) diff --git a/rabbitmq_amqp/common.go b/rabbitmq_amqp/common.go new file mode 100644 index 0000000..89ce280 --- /dev/null +++ b/rabbitmq_amqp/common.go @@ -0,0 +1,117 @@ +package rabbitmq_amqp + +import ( + "crypto/md5" + "encoding/base64" + "fmt" + "github.com/google/uuid" + "net/url" + "strings" +) + +type PercentCodec struct{} + +// Encode takes a string and returns its percent-encoded representation. +func (pc *PercentCodec) Encode(input string) string { + var encoded strings.Builder + + // Iterate over each character in the input string + for _, char := range input { + // Check if the character is an unreserved character (i.e., it doesn't need encoding) + if isUnreserved(char) { + encoded.WriteRune(char) // Append as is + } else { + // Encode character To %HH format + encoded.WriteString(fmt.Sprintf("%%%02X", char)) + } + } + + return encoded.String() +} + +// Decode takes a percent-encoded string and returns its decoded representation. +func (pc *PercentCodec) Decode(input string) (string, error) { + // Use url.QueryUnescape which properly decodes percent-encoded strings + decoded, err := url.QueryUnescape(input) + if err != nil { + return "", err + } + + return decoded, nil +} + +const ( + responseCode200 = 200 + responseCode201 = 201 + responseCode204 = 204 + responseCode409 = 409 + commandPut = "PUT" + commandGet = "GET" + commandPost = "POST" + commandDelete = "DELETE" + commandReplyTo = "$me" + managementNodeAddress = "/management" + linkPairName = "management-link-pair" +) + +const ( + Exchanges = "exchanges" + Key = "key" + Queues = "queues" + Bindings = "bindings" +) + +// isUnreserved checks if a character is an unreserved character in percent encoding +// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ +func isUnreserved(char rune) bool { + return (char >= 'A' && char <= 'Z') || + (char >= 'a' && char <= 'z') || + (char >= '0' && char <= '9') || + char == '-' || char == '.' || char == '_' || char == '~' +} + +func encodePathSegments(pathSegments string) string { + return (&PercentCodec{}).Encode(pathSegments) +} + +func queuePath(queueName string) string { + return "/" + Queues + "/" + encodePathSegments(queueName) +} + +func validatePositive(label string, value int64) error { + if value < 0 { + return fmt.Errorf("value for %s must be positive, got %d", label, value) + } + return nil +} + +//internal static string GenerateName(string prefix) +//{ +//string uuidStr = Guid.NewGuid().ToString(); +//byte[] uuidBytes = Encoding.ASCII.GetBytes(uuidStr); +//var md5 = MD5.Create(); +//byte[] digest = md5.ComputeHash(uuidBytes); +//return prefix + Convert.ToBase64String(digest) +//.Replace('+', '-') +//.Replace('/', '_') +//.Replace("=", ""); +//} + +func GenerateNameWithDefaultPrefix() string { + return GenerateName("client.gen-") +} + +// GenerateName generates a unique name with the given prefix +func GenerateName(prefix string) string { + + var uid = uuid.New() + var uuidBytes = []byte(uid.String()) + var _md5 = md5.New() + var digest = _md5.Sum(uuidBytes) + result := base64.StdEncoding.EncodeToString(digest) + result = strings.ReplaceAll(result, "+", "-") + result = strings.ReplaceAll(result, "/", "_") + result = strings.ReplaceAll(result, "=", "") + return prefix + result + +} diff --git a/rabbitmq_amqp/connection.go b/rabbitmq_amqp/connection.go new file mode 100644 index 0000000..39b1923 --- /dev/null +++ b/rabbitmq_amqp/connection.go @@ -0,0 +1,35 @@ +package rabbitmq_amqp + +import ( + "context" + "crypto/tls" +) + +type IConnectionSettings interface { + GetHost() string + Host(hostName string) IConnectionSettings + GetPort() int + Port(port int) IConnectionSettings + GetUser() string + User(userName string) IConnectionSettings + GetPassword() string + Password(password string) IConnectionSettings + GetVirtualHost() string + VirtualHost(virtualHost string) IConnectionSettings + GetScheme() string + GetContainerId() string + ContainerId(containerId string) IConnectionSettings + UseSsl(value bool) IConnectionSettings + IsSsl() bool + BuildAddress() string + TlsConfig(config *tls.Config) IConnectionSettings + GetTlsConfig() *tls.Config +} + +type IConnection interface { + Open(ctx context.Context, connectionSettings IConnectionSettings) error + Close(ctx context.Context) error + Management() IManagement + NotifyStatusChange(channel chan *StatusChanged) + GetStatus() int +} diff --git a/rabbitmq_amqp/converters.go b/rabbitmq_amqp/converters.go new file mode 100644 index 0000000..e7b2c0a --- /dev/null +++ b/rabbitmq_amqp/converters.go @@ -0,0 +1,79 @@ +package rabbitmq_amqp + +import ( + "errors" + "fmt" + "regexp" + "strconv" + "strings" +) + +const ( + UnitMb string = "mb" + UnitKb string = "kb" + UnitGb string = "gb" + UnitTb string = "tb" + kilobytesMultiplier = 1000 + megabytesMultiplier = 1000 * 1000 + gigabytesMultiplier = 1000 * 1000 * 1000 + terabytesMultiplier = 1000 * 1000 * 1000 * 1000 +) + +func CapacityBytes(value int64) int64 { + return int64(value) +} + +func CapacityKB(value int64) int64 { + return int64(value * kilobytesMultiplier) +} + +func CapacityMB(value int64) int64 { + return int64(value * megabytesMultiplier) +} + +func CapacityGB(value int64) int64 { + return int64(value * gigabytesMultiplier) +} + +func CapacityTB(value int64) int64 { + return int64(value * terabytesMultiplier) +} + +func CapacityFrom(value string) (int64, error) { + if value == "" || value == "0" { + return 0, nil + } + + match, err := regexp.Compile("^((kb|mb|gb|tb))") + if err != nil { + return 0, + errors.New(fmt.Sprintf("Capacity, invalid unit size format:%s", value)) + } + + foundUnitSize := strings.ToLower(value[len(value)-2:]) + + if match.MatchString(foundUnitSize) { + + size, err := strconv.Atoi(value[:len(value)-2]) + if err != nil { + return 0, errors.New(fmt.Sprintf("Capacity, Invalid number format: %s", value)) + } + switch foundUnitSize { + case UnitKb: + return CapacityKB(int64(size)), nil + + case UnitMb: + return CapacityMB(int64(size)), nil + + case UnitGb: + return CapacityGB(int64(size)), nil + + case UnitTb: + return CapacityTB(int64(size)), nil + } + + } + + return 0, + errors.New(fmt.Sprintf("Capacity, Invalid unit size format: %s", value)) +} diff --git a/rabbitmq_amqp/converters_test.go b/rabbitmq_amqp/converters_test.go new file mode 100644 index 0000000..504c835 --- /dev/null +++ b/rabbitmq_amqp/converters_test.go @@ -0,0 +1,55 @@ +package rabbitmq_amqp + +import ( + "fmt" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Converters", func() { + + It("Converter from number", func() { + Expect(CapacityBytes(100)).To(Equal(int64(100))) + Expect(CapacityKB(1)).To(Equal(int64(1000))) + Expect(CapacityMB(1)).To(Equal(int64(1000 * 1000))) + Expect(CapacityGB(1)).To(Equal(int64(1000 * 1000 * 1000))) + Expect(CapacityTB(1)).To(Equal(int64(1000 * 1000 * 1000 * 1000))) + }) + + It("Converter from string", func() { + v, err := CapacityFrom("1KB") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000))) + + v, err = CapacityFrom("1MB") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000 * 1000))) + + v, err = CapacityFrom("1GB") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000 * 1000 * 1000))) + + v, err = CapacityFrom("1tb") + Expect(err).NotTo(HaveOccurred()) + Expect(v).To(Equal(int64(1000 * 1000 * 1000 * 1000))) + }) + + It("Converter from string logError", func() { + v, err := CapacityFrom("10LL") + Expect(fmt.Sprintf("%s", err)). + To(ContainSubstring("Invalid unit size format")) + + v, err = CapacityFrom("aGB") + Expect(fmt.Sprintf("%s", err)). + To(ContainSubstring("Invalid number format")) + + v, err = CapacityFrom("") + Expect(v).To(Equal(int64(0))) + Expect(err).To(BeNil()) + + v, err = CapacityFrom("0") + Expect(v).To(Equal(int64(0))) + Expect(err).To(BeNil()) + }) + +}) diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go new file mode 100644 index 0000000..e373ee7 --- /dev/null +++ b/rabbitmq_amqp/entities.go @@ -0,0 +1,52 @@ +package rabbitmq_amqp + +import ( + "context" +) + +type TQueueType string + +const ( + Quorum TQueueType = "quorum" + Classic TQueueType = "classic" + Stream TQueueType = "stream" +) + +type QueueType struct { + Type TQueueType +} + +func (e QueueType) String() string { + return string(e.Type) +} + +type IEntityInfoSpecification[T any] interface { + Declare(ctx context.Context) (T, error) + Delete(ctx context.Context) error +} + +type IQueueSpecification interface { + GetName() string + Exclusive(isExclusive bool) IQueueSpecification + IsExclusive() bool + AutoDelete(isAutoDelete bool) IQueueSpecification + IsAutoDelete() bool + IEntityInfoSpecification[IQueueInfo] + QueueType(queueType QueueType) IQueueSpecification + GetQueueType() TQueueType + + MaxLengthBytes(length int64) IQueueSpecification + DeadLetterExchange(dlx string) IQueueSpecification + DeadLetterRoutingKey(dlrk string) IQueueSpecification +} + +type IQueueInfo interface { + GetName() string + IsDurable() bool + IsAutoDelete() bool + Exclusive() bool + Type() TQueueType + GetLeader() string + GetReplicas() []string + GetArguments() map[string]any +} diff --git a/rabbitmq_amqp/life_cycle.go b/rabbitmq_amqp/life_cycle.go new file mode 100644 index 0000000..10b4dd5 --- /dev/null +++ b/rabbitmq_amqp/life_cycle.go @@ -0,0 +1,53 @@ +package rabbitmq_amqp + +import "sync" + +const ( + Open = iota + Reconnecting = iota + Closing = iota + Closed = iota +) + +type StatusChanged struct { + From int + To int +} + +type LifeCycle struct { + status int + chStatusChanged chan *StatusChanged + mutex *sync.Mutex +} + +func NewLifeCycle() *LifeCycle { + return &LifeCycle{ + status: Closed, + mutex: &sync.Mutex{}, + } +} + +func (l *LifeCycle) Status() int { + l.mutex.Lock() + defer l.mutex.Unlock() + return l.status +} + +func (l *LifeCycle) SetStatus(value int) { + l.mutex.Lock() + defer l.mutex.Unlock() + if l.status == value { + return + } + + oldState := l.status + l.status = value + + if l.chStatusChanged == nil { + return + } + l.chStatusChanged <- &StatusChanged{ + From: oldState, + To: value, + } +} diff --git a/rabbitmq_amqp/management.go b/rabbitmq_amqp/management.go new file mode 100644 index 0000000..a6130ec --- /dev/null +++ b/rabbitmq_amqp/management.go @@ -0,0 +1,16 @@ +package rabbitmq_amqp + +import ( + "context" +) + +type IManagement interface { + Open(ctx context.Context, connection IConnection) error + Close(ctx context.Context) error + Queue(queueName string) IQueueSpecification + QueueClientName() IQueueSpecification + GetStatus() int + NotifyStatusChange(channel chan *StatusChanged) + Request(ctx context.Context, body any, path string, method string, + expectedResponseCodes []int) (map[string]any, error) +} diff --git a/rabbitmq_amqp/pkg_suite_test.go b/rabbitmq_amqp/pkg_suite_test.go new file mode 100644 index 0000000..08ee73d --- /dev/null +++ b/rabbitmq_amqp/pkg_suite_test.go @@ -0,0 +1,13 @@ +package rabbitmq_amqp_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestPkg(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Pkg Suite") +}